From b2999e046035bb74f93639d59f2f963a19b119ba Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Wed, 30 Jul 2025 17:46:57 +1000 Subject: [PATCH] Aggregate connection stats by peer implementation --- atomic-count.go | 6 ++-- client-stats.go | 2 +- client.go | 9 ++--- conn-stats.go | 8 +++-- env.go | 1 + misc_test.go | 18 ---------- peer-impl.go | 1 + peer.go | 85 +++++++++++++++++++----------------------------- peerconn.go | 33 +++++++++++++++++-- torrent-stats.go | 17 ++++++++-- torrent.go | 78 +++++++++++++++++++------------------------- webseed-peer.go | 4 +++ 12 files changed, 135 insertions(+), 127 deletions(-) create mode 100644 env.go diff --git a/atomic-count.go b/atomic-count.go index 415f427a..6a18b8ff 100644 --- a/atomic-count.go +++ b/atomic-count.go @@ -31,9 +31,11 @@ func (me *Count) MarshalJSON() ([]byte, error) { // TODO: Can this use more generics to speed it up? Should we be checking the field types? func copyCountFields[T any](src *T) (dst T) { + srcValue := reflect.ValueOf(src).Elem() + dstValue := reflect.ValueOf(&dst).Elem() for i := 0; i < reflect.TypeFor[T]().NumField(); i++ { - n := reflect.ValueOf(src).Elem().Field(i).Addr().Interface().(*Count).Int64() - reflect.ValueOf(&dst).Elem().Field(i).Addr().Interface().(*Count).Add(n) + n := srcValue.Field(i).Addr().Interface().(*Count).Int64() + dstValue.Field(i).Addr().Interface().(*Count).Add(n) } return } diff --git a/client-stats.go b/client-stats.go index fb080dbb..48711e78 100644 --- a/client-stats.go +++ b/client-stats.go @@ -39,7 +39,7 @@ type ClientStats struct { } func (cl *Client) statsLocked() (stats ClientStats) { - stats.ConnStats = copyCountFields(&cl.connStats) + stats.AllConnStats = cl.connStats.Copy() stats.TorrentStatCounters = copyCountFields(&cl.counters) for t := range cl.torrents { stats.TorrentGauges.Add(t.gauges()) diff --git a/client.go b/client.go index b4857a32..9a62e40d 100644 --- a/client.go +++ b/client.go @@ -59,7 +59,7 @@ const webseedRequestUpdateTimerInterval = time.Second type Client struct { // An aggregate of stats over all connections. First in struct to ensure 64-bit alignment of // fields. See #262. - connStats ConnStats + connStats AllConnStats counters TorrentStatCounters _mu lockWithDeferreds @@ -1292,7 +1292,7 @@ func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerCon piece := d.Piece switch d.Type { case pp.DataMetadataExtensionMsgType: - c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead })) + c.modifyRelevantConnStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead })) if !c.requestedMetadataPiece(piece) { return fmt.Errorf("got unexpected piece %d", piece) } @@ -1706,6 +1706,7 @@ func (cl *Client) newConnection(nc net.Conn, opts newConnectionOpts) (c *PeerCon } c = &PeerConn{ Peer: Peer{ + cl: cl, outgoing: opts.outgoing, choking: true, peerChoking: true, @@ -1917,9 +1918,9 @@ func (cl *Client) ICEServers() []webrtc.ICEServer { } // Returns connection-level aggregate connStats at the Client level. See the comment on -// TorrentStats.ConnStats. +// TorrentStats.ConnStats. You probably want Client.Stats() instead. func (cl *Client) ConnStats() ConnStats { - return cl.connStats.Copy() + return cl.connStats.ConnStats.Copy() } func (cl *Client) Stats() ClientStats { diff --git a/conn-stats.go b/conn-stats.go index 12e00287..cb51a1b0 100644 --- a/conn-stats.go +++ b/conn-stats.go @@ -55,12 +55,16 @@ func (cs *ConnStats) receivedChunk(size int64) { cs.BytesReadData.Add(size) } -func (cs *ConnStats) incrementPiecesDirtiedGood() { +func (cs *ConnStats) incrementPiecesDirtiedGood() bool { cs.PiecesDirtiedGood.Add(1) + // This method is used as an iterator and should never return early. + return true } -func (cs *ConnStats) incrementPiecesDirtiedBad() { +func (cs *ConnStats) incrementPiecesDirtiedBad() bool { cs.PiecesDirtiedBad.Add(1) + // This method is used as an iterator and should never return early. + return true } func add(n int64, f func(*ConnStats) *Count) func(*ConnStats) { diff --git a/env.go b/env.go new file mode 100644 index 00000000..10cbafc7 --- /dev/null +++ b/env.go @@ -0,0 +1 @@ +package torrent diff --git a/misc_test.go b/misc_test.go index d8c0c7aa..d81a1f26 100644 --- a/misc_test.go +++ b/misc_test.go @@ -5,8 +5,6 @@ import ( "strings" "testing" - "github.com/anacrolix/missinggo/iter" - "github.com/anacrolix/missinggo/v2/bitmap" "github.com/davecgh/go-spew/spew" "github.com/stretchr/testify/assert" ) @@ -23,22 +21,6 @@ func TestTorrentOffsetRequest(t *testing.T) { check(13, 5, 13, Request{}, false) } -func BenchmarkIterBitmapsDistinct(t *testing.B) { - t.ReportAllocs() - for i := 0; i < t.N; i += 1 { - var skip, first, second bitmap.Bitmap - skip.Add(1) - first.Add(1, 0, 3) - second.Add(1, 2, 0) - skipCopy := skip.Copy() - t.StartTimer() - output := iter.ToSlice(iterBitmapsDistinct(&skipCopy, first, second)) - t.StopTimer() - assert.Equal(t, []interface{}{0, 3, 2}, output) - assert.Equal(t, []bitmap.BitIndex{1}, skip.ToSortedSlice()) - } -} - func TestSpewConnStats(t *testing.T) { s := spew.Sdump(ConnStats{}) t.Logf("\n%s", s) diff --git a/peer-impl.go b/peer-impl.go index aa0fe9fe..27be7b06 100644 --- a/peer-impl.go +++ b/peer-impl.go @@ -53,4 +53,5 @@ type newHotPeerImpl interface { // Whether we're expecting to receive chunks because we have outstanding requests. Used for // example to calculate download rate. expectingChunks() bool + allConnStatsImplField(*AllConnStats) *ConnStats } diff --git a/peer.go b/peer.go index 7dcf6503..939dfba6 100644 --- a/peer.go +++ b/peer.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "iter" "log/slog" "net" "strings" @@ -14,11 +15,9 @@ import ( "github.com/anacrolix/chansync" . "github.com/anacrolix/generics" "github.com/anacrolix/log" - "github.com/anacrolix/missinggo/iter" "github.com/anacrolix/missinggo/v2/bitmap" "github.com/anacrolix/missinggo/v2/panicif" "github.com/anacrolix/multiless" - "github.com/anacrolix/torrent/internal/alloclim" "github.com/anacrolix/torrent/mse" pp "github.com/anacrolix/torrent/peer_protocol" @@ -31,7 +30,8 @@ type ( // First to ensure 64-bit alignment for atomics. See #262. _stats ConnStats - t *Torrent + cl *Client + t *Torrent legacyPeerImpl peerImpl newHotPeerImpl @@ -64,9 +64,6 @@ type ( // True if the connection is operating over MSE obfuscation. headerEncrypted bool cryptoMethod mse.CryptoMethod - // Set true after we've added our ConnStats generated during handshake to - // other ConnStat instances as determined when the *Torrent became known. - reconciledHandshakeStats bool lastMessageReceived time.Time completedHandshake time.Time @@ -289,49 +286,31 @@ func (cn *Peer) totalExpectingTime() (ret time.Duration) { // are okay. type messageWriter func(pp.Message) bool -// Emits the indices in the Bitmaps bms in order, never repeating any index. -// skip is mutated during execution, and its initial values will never be -// emitted. -func iterBitmapsDistinct(skip *bitmap.Bitmap, bms ...bitmap.Bitmap) iter.Func { - return func(cb iter.Callback) { - for _, bm := range bms { - if !iter.All( - func(_i interface{}) bool { - i := _i.(int) - if skip.Contains(bitmap.BitIndex(i)) { - return true - } - skip.Add(bitmap.BitIndex(i)) - return cb(i) - }, - bm.Iter, - ) { - return - } - } +// All ConnStats that include this connection. Some objects are not known until the handshake is +// complete, after which it's expected to reconcile the differences. +func (cn *Peer) modifyRelevantConnStats(f func(*ConnStats)) { + // Every peer has basic ConnStats for now. + f(&cn._stats) + incAll := func(stats *ConnStats) bool { + f(stats) + return true } + cn.upstreamConnStats()(incAll) } -// After handshake, we know what Torrent and Client stats to include for a -// connection. -func (cn *Peer) postHandshakeStats(f func(*ConnStats)) { - t := cn.t - f(&t.connStats) - f(&t.cl.connStats) -} - -// All ConnStats that include this connection. Some objects are not known -// until the handshake is complete, after which it's expected to reconcile the -// differences. -func (cn *Peer) allStats(f func(*ConnStats)) { - f(&cn._stats) - if cn.reconciledHandshakeStats { - cn.postHandshakeStats(f) +// Yields relevant upstream ConnStats. Skips Torrent if it isn't set. +func (cn *Peer) upstreamConnStats() iter.Seq[*ConnStats] { + return func(yield func(*ConnStats) bool) { + // PeerConn can be nil when it hasn't completed handshake. + if cn.t != nil { + cn.relevantConnStats(&cn.t.connStats)(yield) + } + cn.relevantConnStats(&cn.cl.connStats)(yield) } } func (cn *Peer) readBytes(n int64) { - cn.allStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesRead })) + cn.modifyRelevantConnStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesRead })) } func (c *Peer) lastHelpful() (ret time.Time) { @@ -363,7 +342,7 @@ func runSafeExtraneous(f func()) { } func (c *Peer) doChunkReadStats(size int64) { - c.allStats(func(cs *ConnStats) { cs.receivedChunk(size) }) + c.modifyRelevantConnStats(func(cs *ConnStats) { cs.receivedChunk(size) }) } // Handle a received chunk from a peer. TODO: Break this out into non-wire protocol specific @@ -403,16 +382,16 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { if t.haveChunk(ppReq) { // panic(fmt.Sprintf("%+v", ppReq)) ChunksReceived.Add("redundant", 1) - c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadWasted })) + c.modifyRelevantConnStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadWasted })) return nil } piece := t.piece(ppReq.Index.Int()) - c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful })) - c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData })) + c.modifyRelevantConnStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful })) + c.modifyRelevantConnStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData })) if intended { - c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulIntendedData })) + c.modifyRelevantConnStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulIntendedData })) } for _, f := range c.t.cl.config.Callbacks.ReceivedUsefulData { f(ReceivedUsefulDataEvent{c, msg}) @@ -548,10 +527,6 @@ func (cn *Peer) newPeerPieces() *roaring.Bitmap { return ret } -func (cn *Peer) stats() *ConnStats { - return &cn._stats -} - func (p *Peer) TryAsPeerConn() (*PeerConn, bool) { pc, ok := p.legacyPeerImpl.(*PeerConn) return pc, ok @@ -579,3 +554,11 @@ func (p *Peer) initClosedCtx() { panicif.NotNil(p.closedCtx) p.closedCtx, p.closedCtxCancel = context.WithCancel(p.t.closedCtx) } + +// Iterates base and peer-impl specific ConnStats from all. +func (p *Peer) relevantConnStats(all *AllConnStats) iter.Seq[*ConnStats] { + return func(yield func(*ConnStats) bool) { + yield(&all.ConnStats) + yield(p.peerImpl.allConnStatsImplField(all)) + } +} diff --git a/peerconn.go b/peerconn.go index e0c9b363..bbab8eb8 100644 --- a/peerconn.go +++ b/peerconn.go @@ -114,6 +114,13 @@ type PeerConn struct { receivedHashPieces map[[32]byte][][32]byte peerRequestServerRunning bool + // Set true after we've added our ConnStats generated during handshake to other ConnStat + // instances as determined when the *Torrent became known. + reconciledHandshakeStats bool +} + +func (*PeerConn) allConnStatsImplField(stats *AllConnStats) *ConnStats { + return &stats.PeerConns } func (cn *PeerConn) lastWriteUploadRate() float64 { @@ -584,11 +591,11 @@ func (cn *PeerConn) wroteMsg(msg *pp.Message) { torrent.Add(fmt.Sprintf("Extended messages written for protocol %q", name), 1) } } - cn.allStats(func(cs *ConnStats) { cs.wroteMsg(msg) }) + cn.modifyRelevantConnStats(func(cs *ConnStats) { cs.wroteMsg(msg) }) } func (cn *PeerConn) wroteBytes(n int64) { - cn.allStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesWritten })) + cn.modifyRelevantConnStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesWritten })) } func (c *PeerConn) fastEnabled() bool { @@ -1186,7 +1193,7 @@ func (c *PeerConn) setTorrent(t *Torrent) { c.initClosedCtx() c.logger.WithDefaultLevel(log.Debug).Printf("set torrent=%v", t) c.setPeerLoggers(t.logger, t.slogger()) - t.reconcileHandshakeStats(c.peerPtr()) + c.reconcileHandshakeStats() } func (c *PeerConn) pexPeerFlags() pp.PexPeerFlags { @@ -1870,3 +1877,23 @@ func (c *PeerConn) checkReceivedChunk(req RequestIndex, msg *pp.Message, ppReq R return } + +// Reconcile bytes transferred before connection was associated with a torrent. +func (c *PeerConn) reconcileHandshakeStats() { + panicif.True(c.reconciledHandshakeStats) + if c._stats != (ConnStats{ + // Handshakes should only increment these fields: + BytesWritten: c._stats.BytesWritten, + BytesRead: c._stats.BytesRead, + }) { + panic("bad stats") + } + // Add the stat data so far to relevant Torrent stats that were skipped before the handshake + // completed. + c.relevantConnStats(&c.t.connStats)(func(cs *ConnStats) bool { + cs.BytesRead.Add(c._stats.BytesRead.Int64()) + cs.BytesWritten.Add(c._stats.BytesWritten.Int64()) + return true + }) + c.reconciledHandshakeStats = true +} diff --git a/torrent-stats.go b/torrent-stats.go index 99920607..2f67ac29 100644 --- a/torrent-stats.go +++ b/torrent-stats.go @@ -7,11 +7,24 @@ import ( // Due to ConnStats, may require special alignment on some platforms. See // https://github.com/anacrolix/torrent/issues/383. type TorrentStats struct { + AllConnStats + TorrentStatCounters + TorrentGauges +} + +type AllConnStats struct { // Aggregates stats over all connections past and present. Some values may not have much meaning // in the aggregate context. ConnStats - TorrentStatCounters - TorrentGauges + WebSeeds ConnStats + PeerConns ConnStats +} + +func (me *AllConnStats) Copy() (ret AllConnStats) { + ret.ConnStats = me.ConnStats.Copy() + ret.WebSeeds = me.WebSeeds.Copy() + ret.PeerConns = me.PeerConns.Copy() + return } // Instantaneous metrics in Torrents, and aggregated for Clients. diff --git a/torrent.go b/torrent.go index 773d25f1..3960d7d4 100644 --- a/torrent.go +++ b/torrent.go @@ -67,7 +67,7 @@ var errTorrentClosed = errors.New("torrent closed") type Torrent struct { // Torrent-level aggregate statistics. First in struct to ensure 64-bit // alignment. See #262. - connStats ConnStats + connStats AllConnStats counters TorrentStatCounters cl *Client @@ -2322,7 +2322,7 @@ func (t *Torrent) gauges() (ret TorrentGauges) { } func (t *Torrent) statsLocked() (ret TorrentStats) { - ret.ConnStats = copyCountFields(&t.connStats) + ret.AllConnStats = t.connStats.Copy() ret.TorrentStatCounters = copyCountFields(&t.counters) ret.TorrentGauges = t.gauges() return @@ -2348,23 +2348,6 @@ func (t *Torrent) numTotalPeers() int { return len(peers) } -// Reconcile bytes transferred before connection was associated with a -// torrent. -func (t *Torrent) reconcileHandshakeStats(c *Peer) { - if c._stats != (ConnStats{ - // Handshakes should only increment these fields: - BytesWritten: c._stats.BytesWritten, - BytesRead: c._stats.BytesRead, - }) { - panic("bad stats") - } - c.postHandshakeStats(func(cs *ConnStats) { - cs.BytesRead.Add(c._stats.BytesRead.Int64()) - cs.BytesWritten.Add(c._stats.BytesWritten.Int64()) - }) - c.reconciledHandshakeStats = true -} - // Returns true if the connection is added. func (t *Torrent) addPeerConn(c *PeerConn) (err error) { defer func() { @@ -2518,13 +2501,7 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) { }() if passed { - if len(p.dirtiers) != 0 { - // Don't increment stats above connection-level for every involved connection. - t.allStats((*ConnStats).incrementPiecesDirtiedGood) - } - for c := range p.dirtiers { - c._stats.incrementPiecesDirtiedGood() - } + t.incrementPiecesDirtiedStats(p, (*ConnStats).incrementPiecesDirtiedGood) t.clearPieceTouchers(piece) hasDirty := p.hasDirtyChunks() t.cl.unlock() @@ -2546,13 +2523,7 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) { if len(p.dirtiers) != 0 && p.allChunksDirty() && hashIoErr == nil { // Peers contributed to all the data for this piece hash failure, and the failure was // not due to errors in the storage (such as data being dropped in a cache). - - // Increment Torrent and above stats, and then specific connections. - t.allStats((*ConnStats).incrementPiecesDirtiedBad) - for c := range p.dirtiers { - // Y u do dis peer?! - c.stats().incrementPiecesDirtiedBad() - } + t.incrementPiecesDirtiedStats(p, (*ConnStats).incrementPiecesDirtiedBad) bannableTouchers := make([]*Peer, 0, len(p.dirtiers)) for c := range p.dirtiers { @@ -2919,13 +2890,6 @@ func (t *Torrent) AddClientPeer(cl *Client) int { }()) } -// All stats that include this Torrent. Useful when we want to increment ConnStats but not for every -// connection. -func (t *Torrent) allStats(f func(*ConnStats)) { - f(&t.connStats) - f(&t.cl.connStats) -} - func (t *Torrent) hashingPiece(i pieceIndex) bool { return t.pieces[i].hashing } @@ -3066,10 +3030,10 @@ func (t *Torrent) addWebSeed(url string, opts ...AddWebSeedsOpt) bool { const defaultMaxRequests = 2 ws := webseedPeer{ peer: Peer{ - t: t, - outgoing: true, - Network: "http", - reconciledHandshakeStats: true, + cl: t.cl, + t: t, + outgoing: true, + Network: "http", // TODO: Set ban prefix? RemoteAddr: remoteAddrFromUrl(url), callbacks: t.callbacks(), @@ -3618,3 +3582,29 @@ func (t *Torrent) hasActiveWebseedRequests() bool { } return false } + +// Increment pieces dirtied for conns and aggregate upstreams. +func (t *Torrent) incrementPiecesDirtiedStats(p *Piece, inc func(stats *ConnStats) bool) { + if len(p.dirtiers) == 0 { + // Avoid allocating map. + return + } + // 4 == 2 peerImpls (PeerConn and webseedPeer) and 1 base * one AllConnStats for each of Torrent + // and Client. + distinctUpstreamConnStats := make(map[*ConnStats]struct{}, 6) + for c := range p.dirtiers { + // Apply directly for each peer to avoid allocation. + inc(&c._stats) + // Collect distinct upstream connection stats. + count := 0 + for cs := range c.upstreamConnStats() { + distinctUpstreamConnStats[cs] = struct{}{} + count++ + } + // All dirtiers should have both Torrent and Client stats for both base and impl-ConnStats. + panicif.NotEq(count, 4) + } + // TODO: Have a debug assert/dev logging version of this. + panicif.GreaterThan(len(distinctUpstreamConnStats), 6) + maps.Keys(distinctUpstreamConnStats)(inc) +} diff --git a/webseed-peer.go b/webseed-peer.go index 35fd8ffe..9b64680f 100644 --- a/webseed-peer.go +++ b/webseed-peer.go @@ -32,6 +32,10 @@ type webseedPeer struct { hostKey webseedHostKeyHandle } +func (*webseedPeer) allConnStatsImplField(stats *AllConnStats) *ConnStats { + return &stats.WebSeeds +} + func (me *webseedPeer) cancelAllRequests() { // Is there any point to this? Won't we fail to receive a chunk and cancel anyway? Should we // Close requests instead? -- 2.51.0