From b85facb2e9a718e3a732a01ac7245f9a6e1f4e48 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Wed, 9 Apr 2025 16:21:06 +1000 Subject: [PATCH] Add BytesHashed count and refactor for other Torrent-level counts --- atomic-count.go | 39 ++++++++++++++++++++++++++++++++++ client-stats.go | 9 ++++++-- client.go | 1 + conn_stats.go => conn-stats.go | 35 ++---------------------------- issue97_test.go | 13 ++++-------- peer.go | 2 +- torrent-stats.go | 22 +++++++++++++++++++ torrent.go | 38 ++++++++++++++++++++++++--------- 8 files changed, 104 insertions(+), 55 deletions(-) create mode 100644 atomic-count.go rename conn_stats.go => conn-stats.go (77%) diff --git a/atomic-count.go b/atomic-count.go new file mode 100644 index 00000000..415f427a --- /dev/null +++ b/atomic-count.go @@ -0,0 +1,39 @@ +package torrent + +import ( + "encoding/json" + "fmt" + "reflect" + "sync/atomic" +) + +type Count struct { + n int64 +} + +var _ fmt.Stringer = (*Count)(nil) + +func (me *Count) Add(n int64) { + atomic.AddInt64(&me.n, n) +} + +func (me *Count) Int64() int64 { + return atomic.LoadInt64(&me.n) +} + +func (me *Count) String() string { + return fmt.Sprintf("%v", me.Int64()) +} + +func (me *Count) MarshalJSON() ([]byte, error) { + return json.Marshal(me.n) +} + +// TODO: Can this use more generics to speed it up? Should we be checking the field types? +func copyCountFields[T any](src *T) (dst T) { + for i := 0; i < reflect.TypeFor[T]().NumField(); i++ { + n := reflect.ValueOf(src).Elem().Field(i).Addr().Interface().(*Count).Int64() + reflect.ValueOf(&dst).Elem().Field(i).Addr().Interface().(*Count).Add(n) + } + return +} diff --git a/client-stats.go b/client-stats.go index bfa6994e..fb080dbb 100644 --- a/client-stats.go +++ b/client-stats.go @@ -20,7 +20,7 @@ type clientHolepunchAddrSets struct { } type ClientStats struct { - ConnStats + TorrentStats // Ongoing outgoing dial attempts. There may be more than one dial going on per peer address due // to hole-punch connect requests. The total may not match the sum of attempts for all Torrents @@ -39,7 +39,12 @@ type ClientStats struct { } func (cl *Client) statsLocked() (stats ClientStats) { - stats.ConnStats = cl.connStats.Copy() + stats.ConnStats = copyCountFields(&cl.connStats) + stats.TorrentStatCounters = copyCountFields(&cl.counters) + for t := range cl.torrents { + stats.TorrentGauges.Add(t.gauges()) + } + stats.ActiveHalfOpenAttempts = cl.numHalfOpen stats.NumPeersUndialableWithoutHolepunch = len(cl.undialableWithoutHolepunch) diff --git a/client.go b/client.go index ffaff0d5..7b7d979d 100644 --- a/client.go +++ b/client.go @@ -56,6 +56,7 @@ type Client struct { // An aggregate of stats over all connections. First in struct to ensure 64-bit alignment of // fields. See #262. connStats ConnStats + counters TorrentStatCounters _mu lockWithDeferreds event sync.Cond diff --git a/conn_stats.go b/conn-stats.go similarity index 77% rename from conn_stats.go rename to conn-stats.go index 0c5bfc78..394ebfdf 100644 --- a/conn_stats.go +++ b/conn-stats.go @@ -1,13 +1,8 @@ package torrent import ( - "encoding/json" - "fmt" - "io" - "reflect" - "sync/atomic" - pp "github.com/anacrolix/torrent/peer_protocol" + "io" ) // Various connection-level metrics. At the Torrent level these are aggregates. Chunks are messages @@ -41,33 +36,7 @@ type ConnStats struct { } func (me *ConnStats) Copy() (ret ConnStats) { - for i := 0; i < reflect.TypeOf(ConnStats{}).NumField(); i++ { - n := reflect.ValueOf(me).Elem().Field(i).Addr().Interface().(*Count).Int64() - reflect.ValueOf(&ret).Elem().Field(i).Addr().Interface().(*Count).Add(n) - } - return -} - -type Count struct { - n int64 -} - -var _ fmt.Stringer = (*Count)(nil) - -func (me *Count) Add(n int64) { - atomic.AddInt64(&me.n, n) -} - -func (me *Count) Int64() int64 { - return atomic.LoadInt64(&me.n) -} - -func (me *Count) String() string { - return fmt.Sprintf("%v", me.Int64()) -} - -func (me *Count) MarshalJSON() ([]byte, error) { - return json.Marshal(me.n) + return copyCountFields(me) } func (cs *ConnStats) wroteMsg(msg *pp.Message) { diff --git a/issue97_test.go b/issue97_test.go index 9e14f047..72fc3428 100644 --- a/issue97_test.go +++ b/issue97_test.go @@ -1,26 +1,21 @@ package torrent import ( + "github.com/anacrolix/torrent/metainfo" + "github.com/anacrolix/torrent/storage" "testing" - "github.com/anacrolix/log" "github.com/stretchr/testify/require" "github.com/anacrolix/torrent/internal/testutil" - "github.com/anacrolix/torrent/storage" ) func TestHashPieceAfterStorageClosed(t *testing.T) { + cl := newTestingClient(t) td := t.TempDir() cs := storage.NewFile(td) defer cs.Close() - tt := &Torrent{ - storageOpener: storage.NewClient(cs), - logger: log.Default, - chunkSize: defaultChunkSize, - } - tt.infoHash.Ok = true - tt.infoHash.Value[0] = 1 + tt := cl.newTorrent(metainfo.Hash{1}, cs) mi := testutil.GreetingMetaInfo() info, err := mi.UnmarshalInfo() require.NoError(t, err) diff --git a/peer.go b/peer.go index 4e49468f..35d7066b 100644 --- a/peer.go +++ b/peer.go @@ -531,7 +531,7 @@ func iterBitmapsDistinct(skip *bitmap.Bitmap, bms ...bitmap.Bitmap) iter.Func { // connection. func (cn *Peer) postHandshakeStats(f func(*ConnStats)) { t := cn.t - f(&t.stats) + f(&t.connStats) f(&t.cl.connStats) } diff --git a/torrent-stats.go b/torrent-stats.go index 0dd58add..3f3586e5 100644 --- a/torrent-stats.go +++ b/torrent-stats.go @@ -1,12 +1,21 @@ package torrent +import ( + "reflect" +) + // Due to ConnStats, may require special alignment on some platforms. See // https://github.com/anacrolix/torrent/issues/383. type TorrentStats struct { // Aggregates stats over all connections past and present. Some values may not have much meaning // in the aggregate context. ConnStats + TorrentStatCounters + TorrentGauges +} +// Instantaneous metrics in Torrents, and aggregated for Clients. +type TorrentGauges struct { // Ordered by expected descending quantities (if all is well). TotalPeers int PendingPeers int @@ -15,3 +24,16 @@ type TorrentStats struct { HalfOpenPeers int PiecesComplete int } + +func (me *TorrentGauges) Add(agg TorrentGauges) { + src := reflect.ValueOf(agg) + dst := reflect.ValueOf(me).Elem() + for i := 0; i < reflect.TypeFor[TorrentGauges]().NumField(); i++ { + *dst.Field(i).Addr().Interface().(*int) += src.Field(i).Interface().(int) + } + return +} + +type TorrentStatCounters struct { + BytesHashed Count +} diff --git a/torrent.go b/torrent.go index 33dd9fc4..c1a381db 100644 --- a/torrent.go +++ b/torrent.go @@ -59,7 +59,9 @@ var errTorrentClosed = errors.New("torrent closed") type Torrent struct { // Torrent-level aggregate statistics. First in struct to ensure 64-bit // alignment. See #262. - stats ConnStats + connStats ConnStats + counters TorrentStatCounters + cl *Client logger log.Logger @@ -1143,9 +1145,14 @@ func (t *Torrent) smartBanBlockCheckingWriter(piece pieceIndex) *blockCheckingWr } } +func (t *Torrent) countBytesHashed(n int64) { + t.counters.BytesHashed.Add(n) + t.cl.counters.BytesHashed.Add(n) +} + func (t *Torrent) hashPiece(piece pieceIndex) ( correct bool, - // These are peers that sent us blocks that differ from what we hash here. +// These are peers that sent us blocks that differ from what we hash here. differingPeers map[bannableAddr]struct{}, err error, ) { @@ -1159,6 +1166,9 @@ func (t *Torrent) hashPiece(piece pieceIndex) ( var sum metainfo.Hash // log.Printf("A piece decided to self-hash: %d", piece) sum, err = i.SelfHash() + if err == nil { + t.countBytesHashed(int64(p.length())) + } correct = sum == *p.hash // Can't do smart banning without reading the piece. The smartBanCache is still cleared // in pieceHasher regardless. @@ -1180,6 +1190,7 @@ func (t *Torrent) hashPiece(piece pieceIndex) ( err, )) } + t.countBytesHashed(written) } var sum [20]byte sumExactly(sum[:], h.Sum) @@ -1215,7 +1226,7 @@ func sumExactly(dst []byte, sum func(b []byte) []byte) { } func (t *Torrent) hashPieceWithSpecificHash(piece pieceIndex, h hash.Hash) ( - // These are peers that sent us blocks that differ from what we hash here. +// These are peers that sent us blocks that differ from what we hash here. differingPeers map[bannableAddr]struct{}, err error, ) { @@ -1233,6 +1244,7 @@ func (t *Torrent) hashPieceWithSpecificHash(piece pieceIndex, h hash.Hash) ( // ban peers for all recorded blocks that weren't just written. return } + t.countBytesHashed(written) } // Flush before writing padding, since we would not have recorded the padding blocks. smartBanWriter.Flush() @@ -1256,8 +1268,8 @@ func (t *Torrent) havePiece(index pieceIndex) bool { } func (t *Torrent) maybeDropMutuallyCompletePeer( - // I'm not sure about taking peer here, not all peer implementations actually drop. Maybe that's - // okay? +// I'm not sure about taking peer here, not all peer implementations actually drop. Maybe that's +// okay? p *PeerConn, ) { if !t.cl.config.DropMutuallyCompletePeers { @@ -2048,9 +2060,9 @@ func (t *Torrent) announceRequest( // The following are vaguely described in BEP 3. Left: t.bytesLeftAnnounce(), - Uploaded: t.stats.BytesWrittenData.Int64(), + Uploaded: t.connStats.BytesWrittenData.Int64(), // There's no mention of wasted or unwanted download in the BEP. - Downloaded: t.stats.BytesReadUsefulData.Int64(), + Downloaded: t.connStats.BytesReadUsefulData.Int64(), } } @@ -2201,7 +2213,7 @@ func (t *Torrent) Stats() TorrentStats { return t.statsLocked() } -func (t *Torrent) statsLocked() (ret TorrentStats) { +func (t *Torrent) gauges() (ret TorrentGauges) { ret.ActivePeers = len(t.conns) ret.HalfOpenPeers = len(t.halfOpen) ret.PendingPeers = t.peers.Len() @@ -2212,11 +2224,17 @@ func (t *Torrent) statsLocked() (ret TorrentStats) { ret.ConnectedSeeders++ } } - ret.ConnStats = t.stats.Copy() ret.PiecesComplete = t.numPiecesCompleted() return } +func (t *Torrent) statsLocked() (ret TorrentStats) { + ret.ConnStats = copyCountFields(&t.connStats) + ret.TorrentStatCounters = copyCountFields(&t.counters) + ret.TorrentGauges = t.gauges() + return +} + // The total number of peers in the torrent. func (t *Torrent) numTotalPeers() int { peers := make(map[string]struct{}) @@ -2773,7 +2791,7 @@ func (t *Torrent) AddClientPeer(cl *Client) int { // All stats that include this Torrent. Useful when we want to increment ConnStats but not for every // connection. func (t *Torrent) allStats(f func(*ConnStats)) { - f(&t.stats) + f(&t.connStats) f(&t.cl.connStats) } -- 2.48.1