]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Aggregate connection stats by peer implementation
authorMatt Joiner <anacrolix@gmail.com>
Wed, 30 Jul 2025 07:46:57 +0000 (17:46 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Wed, 30 Jul 2025 07:46:57 +0000 (17:46 +1000)
12 files changed:
atomic-count.go
client-stats.go
client.go
conn-stats.go
env.go [new file with mode: 0644]
misc_test.go
peer-impl.go
peer.go
peerconn.go
torrent-stats.go
torrent.go
webseed-peer.go

index 415f427a6d5f2496f9f0b47d635e6b1063022a2b..6a18b8ff3b7bf9d745f2cd3eb642fbb1c7427e6f 100644 (file)
@@ -31,9 +31,11 @@ func (me *Count) MarshalJSON() ([]byte, error) {
 
 // TODO: Can this use more generics to speed it up? Should we be checking the field types?
 func copyCountFields[T any](src *T) (dst T) {
+       srcValue := reflect.ValueOf(src).Elem()
+       dstValue := reflect.ValueOf(&dst).Elem()
        for i := 0; i < reflect.TypeFor[T]().NumField(); i++ {
-               n := reflect.ValueOf(src).Elem().Field(i).Addr().Interface().(*Count).Int64()
-               reflect.ValueOf(&dst).Elem().Field(i).Addr().Interface().(*Count).Add(n)
+               n := srcValue.Field(i).Addr().Interface().(*Count).Int64()
+               dstValue.Field(i).Addr().Interface().(*Count).Add(n)
        }
        return
 }
index fb080dbb2bf87530e845d17e809aa287041a8d75..48711e781dfb983c48c52ab87b5a634b23af1c40 100644 (file)
@@ -39,7 +39,7 @@ type ClientStats struct {
 }
 
 func (cl *Client) statsLocked() (stats ClientStats) {
-       stats.ConnStats = copyCountFields(&cl.connStats)
+       stats.AllConnStats = cl.connStats.Copy()
        stats.TorrentStatCounters = copyCountFields(&cl.counters)
        for t := range cl.torrents {
                stats.TorrentGauges.Add(t.gauges())
index b4857a32ac058583d854bf21f88859bffe00b7b9..9a62e40d92766ae3c880e609f3ac839ee64c094f 100644 (file)
--- a/client.go
+++ b/client.go
@@ -59,7 +59,7 @@ const webseedRequestUpdateTimerInterval = time.Second
 type Client struct {
        // An aggregate of stats over all connections. First in struct to ensure 64-bit alignment of
        // fields. See #262.
-       connStats ConnStats
+       connStats AllConnStats
        counters  TorrentStatCounters
 
        _mu    lockWithDeferreds
@@ -1292,7 +1292,7 @@ func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerCon
        piece := d.Piece
        switch d.Type {
        case pp.DataMetadataExtensionMsgType:
-               c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
+               c.modifyRelevantConnStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
                if !c.requestedMetadataPiece(piece) {
                        return fmt.Errorf("got unexpected piece %d", piece)
                }
@@ -1706,6 +1706,7 @@ func (cl *Client) newConnection(nc net.Conn, opts newConnectionOpts) (c *PeerCon
        }
        c = &PeerConn{
                Peer: Peer{
+                       cl:          cl,
                        outgoing:    opts.outgoing,
                        choking:     true,
                        peerChoking: true,
@@ -1917,9 +1918,9 @@ func (cl *Client) ICEServers() []webrtc.ICEServer {
 }
 
 // Returns connection-level aggregate connStats at the Client level. See the comment on
-// TorrentStats.ConnStats.
+// TorrentStats.ConnStats. You probably want Client.Stats() instead.
 func (cl *Client) ConnStats() ConnStats {
-       return cl.connStats.Copy()
+       return cl.connStats.ConnStats.Copy()
 }
 
 func (cl *Client) Stats() ClientStats {
index 12e00287686a1f3c64728c5437504a2f9a392d12..cb51a1b0cc67fcca934791e0dd1f17674c73e8f2 100644 (file)
@@ -55,12 +55,16 @@ func (cs *ConnStats) receivedChunk(size int64) {
        cs.BytesReadData.Add(size)
 }
 
-func (cs *ConnStats) incrementPiecesDirtiedGood() {
+func (cs *ConnStats) incrementPiecesDirtiedGood() bool {
        cs.PiecesDirtiedGood.Add(1)
+       // This method is used as an iterator and should never return early.
+       return true
 }
 
-func (cs *ConnStats) incrementPiecesDirtiedBad() {
+func (cs *ConnStats) incrementPiecesDirtiedBad() bool {
        cs.PiecesDirtiedBad.Add(1)
+       // This method is used as an iterator and should never return early.
+       return true
 }
 
 func add(n int64, f func(*ConnStats) *Count) func(*ConnStats) {
diff --git a/env.go b/env.go
new file mode 100644 (file)
index 0000000..10cbafc
--- /dev/null
+++ b/env.go
@@ -0,0 +1 @@
+package torrent
index d8c0c7aab2850b0abc14c49e2f00102ac1397d16..d81a1f26d8d39756707d2b285096bb0adc74abe7 100644 (file)
@@ -5,8 +5,6 @@ import (
        "strings"
        "testing"
 
-       "github.com/anacrolix/missinggo/iter"
-       "github.com/anacrolix/missinggo/v2/bitmap"
        "github.com/davecgh/go-spew/spew"
        "github.com/stretchr/testify/assert"
 )
@@ -23,22 +21,6 @@ func TestTorrentOffsetRequest(t *testing.T) {
        check(13, 5, 13, Request{}, false)
 }
 
-func BenchmarkIterBitmapsDistinct(t *testing.B) {
-       t.ReportAllocs()
-       for i := 0; i < t.N; i += 1 {
-               var skip, first, second bitmap.Bitmap
-               skip.Add(1)
-               first.Add(1, 0, 3)
-               second.Add(1, 2, 0)
-               skipCopy := skip.Copy()
-               t.StartTimer()
-               output := iter.ToSlice(iterBitmapsDistinct(&skipCopy, first, second))
-               t.StopTimer()
-               assert.Equal(t, []interface{}{0, 3, 2}, output)
-               assert.Equal(t, []bitmap.BitIndex{1}, skip.ToSortedSlice())
-       }
-}
-
 func TestSpewConnStats(t *testing.T) {
        s := spew.Sdump(ConnStats{})
        t.Logf("\n%s", s)
index aa0fe9feae101d5a1e9b459804d9342598eabfcc..27be7b0648657a0aee285b0c9406b7deb9f30259 100644 (file)
@@ -53,4 +53,5 @@ type newHotPeerImpl interface {
        // Whether we're expecting to receive chunks because we have outstanding requests. Used for
        // example to calculate download rate.
        expectingChunks() bool
+       allConnStatsImplField(*AllConnStats) *ConnStats
 }
diff --git a/peer.go b/peer.go
index 7dcf6503918e8a095ba0c40d3efc86446ecc677a..939dfba6c00a9b188e02cf0a0a0863ebfa9ea0e8 100644 (file)
--- a/peer.go
+++ b/peer.go
@@ -4,6 +4,7 @@ import (
        "context"
        "fmt"
        "io"
+       "iter"
        "log/slog"
        "net"
        "strings"
@@ -14,11 +15,9 @@ import (
        "github.com/anacrolix/chansync"
        . "github.com/anacrolix/generics"
        "github.com/anacrolix/log"
-       "github.com/anacrolix/missinggo/iter"
        "github.com/anacrolix/missinggo/v2/bitmap"
        "github.com/anacrolix/missinggo/v2/panicif"
        "github.com/anacrolix/multiless"
-
        "github.com/anacrolix/torrent/internal/alloclim"
        "github.com/anacrolix/torrent/mse"
        pp "github.com/anacrolix/torrent/peer_protocol"
@@ -31,7 +30,8 @@ type (
                // First to ensure 64-bit alignment for atomics. See #262.
                _stats ConnStats
 
-               t *Torrent
+               cl *Client
+               t  *Torrent
 
                legacyPeerImpl
                peerImpl  newHotPeerImpl
@@ -64,9 +64,6 @@ type (
                // True if the connection is operating over MSE obfuscation.
                headerEncrypted bool
                cryptoMethod    mse.CryptoMethod
-               // Set true after we've added our ConnStats generated during handshake to
-               // other ConnStat instances as determined when the *Torrent became known.
-               reconciledHandshakeStats bool
 
                lastMessageReceived time.Time
                completedHandshake  time.Time
@@ -289,49 +286,31 @@ func (cn *Peer) totalExpectingTime() (ret time.Duration) {
 // are okay.
 type messageWriter func(pp.Message) bool
 
-// Emits the indices in the Bitmaps bms in order, never repeating any index.
-// skip is mutated during execution, and its initial values will never be
-// emitted.
-func iterBitmapsDistinct(skip *bitmap.Bitmap, bms ...bitmap.Bitmap) iter.Func {
-       return func(cb iter.Callback) {
-               for _, bm := range bms {
-                       if !iter.All(
-                               func(_i interface{}) bool {
-                                       i := _i.(int)
-                                       if skip.Contains(bitmap.BitIndex(i)) {
-                                               return true
-                                       }
-                                       skip.Add(bitmap.BitIndex(i))
-                                       return cb(i)
-                               },
-                               bm.Iter,
-                       ) {
-                               return
-                       }
-               }
+// All ConnStats that include this connection. Some objects are not known until the handshake is
+// complete, after which it's expected to reconcile the differences.
+func (cn *Peer) modifyRelevantConnStats(f func(*ConnStats)) {
+       // Every peer has basic ConnStats for now.
+       f(&cn._stats)
+       incAll := func(stats *ConnStats) bool {
+               f(stats)
+               return true
        }
+       cn.upstreamConnStats()(incAll)
 }
 
-// After handshake, we know what Torrent and Client stats to include for a
-// connection.
-func (cn *Peer) postHandshakeStats(f func(*ConnStats)) {
-       t := cn.t
-       f(&t.connStats)
-       f(&t.cl.connStats)
-}
-
-// All ConnStats that include this connection. Some objects are not known
-// until the handshake is complete, after which it's expected to reconcile the
-// differences.
-func (cn *Peer) allStats(f func(*ConnStats)) {
-       f(&cn._stats)
-       if cn.reconciledHandshakeStats {
-               cn.postHandshakeStats(f)
+// Yields relevant upstream ConnStats. Skips Torrent if it isn't set.
+func (cn *Peer) upstreamConnStats() iter.Seq[*ConnStats] {
+       return func(yield func(*ConnStats) bool) {
+               // PeerConn can be nil when it hasn't completed handshake.
+               if cn.t != nil {
+                       cn.relevantConnStats(&cn.t.connStats)(yield)
+               }
+               cn.relevantConnStats(&cn.cl.connStats)(yield)
        }
 }
 
 func (cn *Peer) readBytes(n int64) {
-       cn.allStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesRead }))
+       cn.modifyRelevantConnStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesRead }))
 }
 
 func (c *Peer) lastHelpful() (ret time.Time) {
@@ -363,7 +342,7 @@ func runSafeExtraneous(f func()) {
 }
 
 func (c *Peer) doChunkReadStats(size int64) {
-       c.allStats(func(cs *ConnStats) { cs.receivedChunk(size) })
+       c.modifyRelevantConnStats(func(cs *ConnStats) { cs.receivedChunk(size) })
 }
 
 // Handle a received chunk from a peer. TODO: Break this out into non-wire protocol specific
@@ -403,16 +382,16 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
        if t.haveChunk(ppReq) {
                // panic(fmt.Sprintf("%+v", ppReq))
                ChunksReceived.Add("redundant", 1)
-               c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadWasted }))
+               c.modifyRelevantConnStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadWasted }))
                return nil
        }
 
        piece := t.piece(ppReq.Index.Int())
 
-       c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful }))
-       c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData }))
+       c.modifyRelevantConnStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful }))
+       c.modifyRelevantConnStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData }))
        if intended {
-               c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulIntendedData }))
+               c.modifyRelevantConnStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulIntendedData }))
        }
        for _, f := range c.t.cl.config.Callbacks.ReceivedUsefulData {
                f(ReceivedUsefulDataEvent{c, msg})
@@ -548,10 +527,6 @@ func (cn *Peer) newPeerPieces() *roaring.Bitmap {
        return ret
 }
 
-func (cn *Peer) stats() *ConnStats {
-       return &cn._stats
-}
-
 func (p *Peer) TryAsPeerConn() (*PeerConn, bool) {
        pc, ok := p.legacyPeerImpl.(*PeerConn)
        return pc, ok
@@ -579,3 +554,11 @@ func (p *Peer) initClosedCtx() {
        panicif.NotNil(p.closedCtx)
        p.closedCtx, p.closedCtxCancel = context.WithCancel(p.t.closedCtx)
 }
+
+// Iterates base and peer-impl specific ConnStats from all.
+func (p *Peer) relevantConnStats(all *AllConnStats) iter.Seq[*ConnStats] {
+       return func(yield func(*ConnStats) bool) {
+               yield(&all.ConnStats)
+               yield(p.peerImpl.allConnStatsImplField(all))
+       }
+}
index e0c9b3635f4bc9b7db34d552b8cf7f55784e0e7b..bbab8eb8020557d8cac5698b0b7741c52830a70a 100644 (file)
@@ -114,6 +114,13 @@ type PeerConn struct {
        receivedHashPieces map[[32]byte][][32]byte
 
        peerRequestServerRunning bool
+       // Set true after we've added our ConnStats generated during handshake to other ConnStat
+       // instances as determined when the *Torrent became known.
+       reconciledHandshakeStats bool
+}
+
+func (*PeerConn) allConnStatsImplField(stats *AllConnStats) *ConnStats {
+       return &stats.PeerConns
 }
 
 func (cn *PeerConn) lastWriteUploadRate() float64 {
@@ -584,11 +591,11 @@ func (cn *PeerConn) wroteMsg(msg *pp.Message) {
                        torrent.Add(fmt.Sprintf("Extended messages written for protocol %q", name), 1)
                }
        }
-       cn.allStats(func(cs *ConnStats) { cs.wroteMsg(msg) })
+       cn.modifyRelevantConnStats(func(cs *ConnStats) { cs.wroteMsg(msg) })
 }
 
 func (cn *PeerConn) wroteBytes(n int64) {
-       cn.allStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesWritten }))
+       cn.modifyRelevantConnStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesWritten }))
 }
 
 func (c *PeerConn) fastEnabled() bool {
@@ -1186,7 +1193,7 @@ func (c *PeerConn) setTorrent(t *Torrent) {
        c.initClosedCtx()
        c.logger.WithDefaultLevel(log.Debug).Printf("set torrent=%v", t)
        c.setPeerLoggers(t.logger, t.slogger())
-       t.reconcileHandshakeStats(c.peerPtr())
+       c.reconcileHandshakeStats()
 }
 
 func (c *PeerConn) pexPeerFlags() pp.PexPeerFlags {
@@ -1870,3 +1877,23 @@ func (c *PeerConn) checkReceivedChunk(req RequestIndex, msg *pp.Message, ppReq R
 
        return
 }
+
+// Reconcile bytes transferred before connection was associated with a torrent.
+func (c *PeerConn) reconcileHandshakeStats() {
+       panicif.True(c.reconciledHandshakeStats)
+       if c._stats != (ConnStats{
+               // Handshakes should only increment these fields:
+               BytesWritten: c._stats.BytesWritten,
+               BytesRead:    c._stats.BytesRead,
+       }) {
+               panic("bad stats")
+       }
+       // Add the stat data so far to relevant Torrent stats that were skipped before the handshake
+       // completed.
+       c.relevantConnStats(&c.t.connStats)(func(cs *ConnStats) bool {
+               cs.BytesRead.Add(c._stats.BytesRead.Int64())
+               cs.BytesWritten.Add(c._stats.BytesWritten.Int64())
+               return true
+       })
+       c.reconciledHandshakeStats = true
+}
index 999206078d87cc6b30ef2bde497251eaaa8db9a5..2f67ac29a0e3ee5d5001a20aca8b36523666cab2 100644 (file)
@@ -7,11 +7,24 @@ import (
 // Due to ConnStats, may require special alignment on some platforms. See
 // https://github.com/anacrolix/torrent/issues/383.
 type TorrentStats struct {
+       AllConnStats
+       TorrentStatCounters
+       TorrentGauges
+}
+
+type AllConnStats struct {
        // Aggregates stats over all connections past and present. Some values may not have much meaning
        // in the aggregate context.
        ConnStats
-       TorrentStatCounters
-       TorrentGauges
+       WebSeeds  ConnStats
+       PeerConns ConnStats
+}
+
+func (me *AllConnStats) Copy() (ret AllConnStats) {
+       ret.ConnStats = me.ConnStats.Copy()
+       ret.WebSeeds = me.WebSeeds.Copy()
+       ret.PeerConns = me.PeerConns.Copy()
+       return
 }
 
 // Instantaneous metrics in Torrents, and aggregated for Clients.
index 773d25f11e02250c3113a9c7d4ea878d1ad5c18d..3960d7d43f5439919c15937652ac0fb62669f5e8 100644 (file)
@@ -67,7 +67,7 @@ var errTorrentClosed = errors.New("torrent closed")
 type Torrent struct {
        // Torrent-level aggregate statistics. First in struct to ensure 64-bit
        // alignment. See #262.
-       connStats ConnStats
+       connStats AllConnStats
        counters  TorrentStatCounters
 
        cl       *Client
@@ -2322,7 +2322,7 @@ func (t *Torrent) gauges() (ret TorrentGauges) {
 }
 
 func (t *Torrent) statsLocked() (ret TorrentStats) {
-       ret.ConnStats = copyCountFields(&t.connStats)
+       ret.AllConnStats = t.connStats.Copy()
        ret.TorrentStatCounters = copyCountFields(&t.counters)
        ret.TorrentGauges = t.gauges()
        return
@@ -2348,23 +2348,6 @@ func (t *Torrent) numTotalPeers() int {
        return len(peers)
 }
 
-// Reconcile bytes transferred before connection was associated with a
-// torrent.
-func (t *Torrent) reconcileHandshakeStats(c *Peer) {
-       if c._stats != (ConnStats{
-               // Handshakes should only increment these fields:
-               BytesWritten: c._stats.BytesWritten,
-               BytesRead:    c._stats.BytesRead,
-       }) {
-               panic("bad stats")
-       }
-       c.postHandshakeStats(func(cs *ConnStats) {
-               cs.BytesRead.Add(c._stats.BytesRead.Int64())
-               cs.BytesWritten.Add(c._stats.BytesWritten.Int64())
-       })
-       c.reconciledHandshakeStats = true
-}
-
 // Returns true if the connection is added.
 func (t *Torrent) addPeerConn(c *PeerConn) (err error) {
        defer func() {
@@ -2518,13 +2501,7 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
        }()
 
        if passed {
-               if len(p.dirtiers) != 0 {
-                       // Don't increment stats above connection-level for every involved connection.
-                       t.allStats((*ConnStats).incrementPiecesDirtiedGood)
-               }
-               for c := range p.dirtiers {
-                       c._stats.incrementPiecesDirtiedGood()
-               }
+               t.incrementPiecesDirtiedStats(p, (*ConnStats).incrementPiecesDirtiedGood)
                t.clearPieceTouchers(piece)
                hasDirty := p.hasDirtyChunks()
                t.cl.unlock()
@@ -2546,13 +2523,7 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
                if len(p.dirtiers) != 0 && p.allChunksDirty() && hashIoErr == nil {
                        // Peers contributed to all the data for this piece hash failure, and the failure was
                        // not due to errors in the storage (such as data being dropped in a cache).
-
-                       // Increment Torrent and above stats, and then specific connections.
-                       t.allStats((*ConnStats).incrementPiecesDirtiedBad)
-                       for c := range p.dirtiers {
-                               // Y u do dis peer?!
-                               c.stats().incrementPiecesDirtiedBad()
-                       }
+                       t.incrementPiecesDirtiedStats(p, (*ConnStats).incrementPiecesDirtiedBad)
 
                        bannableTouchers := make([]*Peer, 0, len(p.dirtiers))
                        for c := range p.dirtiers {
@@ -2919,13 +2890,6 @@ func (t *Torrent) AddClientPeer(cl *Client) int {
        }())
 }
 
-// All stats that include this Torrent. Useful when we want to increment ConnStats but not for every
-// connection.
-func (t *Torrent) allStats(f func(*ConnStats)) {
-       f(&t.connStats)
-       f(&t.cl.connStats)
-}
-
 func (t *Torrent) hashingPiece(i pieceIndex) bool {
        return t.pieces[i].hashing
 }
@@ -3066,10 +3030,10 @@ func (t *Torrent) addWebSeed(url string, opts ...AddWebSeedsOpt) bool {
        const defaultMaxRequests = 2
        ws := webseedPeer{
                peer: Peer{
-                       t:                        t,
-                       outgoing:                 true,
-                       Network:                  "http",
-                       reconciledHandshakeStats: true,
+                       cl:       t.cl,
+                       t:        t,
+                       outgoing: true,
+                       Network:  "http",
                        // TODO: Set ban prefix?
                        RemoteAddr: remoteAddrFromUrl(url),
                        callbacks:  t.callbacks(),
@@ -3618,3 +3582,29 @@ func (t *Torrent) hasActiveWebseedRequests() bool {
        }
        return false
 }
+
+// Increment pieces dirtied for conns and aggregate upstreams.
+func (t *Torrent) incrementPiecesDirtiedStats(p *Piece, inc func(stats *ConnStats) bool) {
+       if len(p.dirtiers) == 0 {
+               // Avoid allocating map.
+               return
+       }
+       // 4 == 2 peerImpls (PeerConn and webseedPeer) and 1 base * one AllConnStats for each of Torrent
+       // and Client.
+       distinctUpstreamConnStats := make(map[*ConnStats]struct{}, 6)
+       for c := range p.dirtiers {
+               // Apply directly for each peer to avoid allocation.
+               inc(&c._stats)
+               // Collect distinct upstream connection stats.
+               count := 0
+               for cs := range c.upstreamConnStats() {
+                       distinctUpstreamConnStats[cs] = struct{}{}
+                       count++
+               }
+               // All dirtiers should have both Torrent and Client stats for both base and impl-ConnStats.
+               panicif.NotEq(count, 4)
+       }
+       // TODO: Have a debug assert/dev logging version of this.
+       panicif.GreaterThan(len(distinctUpstreamConnStats), 6)
+       maps.Keys(distinctUpstreamConnStats)(inc)
+}
index 35fd8ffef86a251634b3f8612efeb26627340661..9b64680fe47bc73e760c429e1da54c6fecdb910a 100644 (file)
@@ -32,6 +32,10 @@ type webseedPeer struct {
        hostKey          webseedHostKeyHandle
 }
 
+func (*webseedPeer) allConnStatsImplField(stats *AllConnStats) *ConnStats {
+       return &stats.WebSeeds
+}
+
 func (me *webseedPeer) cancelAllRequests() {
        // Is there any point to this? Won't we fail to receive a chunk and cancel anyway? Should we
        // Close requests instead?