--- /dev/null
+package torrent
+
+import (
+ "encoding/json"
+ "fmt"
+ "reflect"
+ "sync/atomic"
+)
+
+type Count struct {
+ n int64
+}
+
+var _ fmt.Stringer = (*Count)(nil)
+
+func (me *Count) Add(n int64) {
+ atomic.AddInt64(&me.n, n)
+}
+
+func (me *Count) Int64() int64 {
+ return atomic.LoadInt64(&me.n)
+}
+
+func (me *Count) String() string {
+ return fmt.Sprintf("%v", me.Int64())
+}
+
+func (me *Count) MarshalJSON() ([]byte, error) {
+ return json.Marshal(me.n)
+}
+
+// TODO: Can this use more generics to speed it up? Should we be checking the field types?
+func copyCountFields[T any](src *T) (dst T) {
+ for i := 0; i < reflect.TypeFor[T]().NumField(); i++ {
+ n := reflect.ValueOf(src).Elem().Field(i).Addr().Interface().(*Count).Int64()
+ reflect.ValueOf(&dst).Elem().Field(i).Addr().Interface().(*Count).Add(n)
+ }
+ return
+}
}
type ClientStats struct {
- ConnStats
+ TorrentStats
// Ongoing outgoing dial attempts. There may be more than one dial going on per peer address due
// to hole-punch connect requests. The total may not match the sum of attempts for all Torrents
}
func (cl *Client) statsLocked() (stats ClientStats) {
- stats.ConnStats = cl.connStats.Copy()
+ stats.ConnStats = copyCountFields(&cl.connStats)
+ stats.TorrentStatCounters = copyCountFields(&cl.counters)
+ for t := range cl.torrents {
+ stats.TorrentGauges.Add(t.gauges())
+ }
+
stats.ActiveHalfOpenAttempts = cl.numHalfOpen
stats.NumPeersUndialableWithoutHolepunch = len(cl.undialableWithoutHolepunch)
// An aggregate of stats over all connections. First in struct to ensure 64-bit alignment of
// fields. See #262.
connStats ConnStats
+ counters TorrentStatCounters
_mu lockWithDeferreds
event sync.Cond
package torrent
import (
- "encoding/json"
- "fmt"
- "io"
- "reflect"
- "sync/atomic"
-
pp "github.com/anacrolix/torrent/peer_protocol"
+ "io"
)
// Various connection-level metrics. At the Torrent level these are aggregates. Chunks are messages
}
func (me *ConnStats) Copy() (ret ConnStats) {
- for i := 0; i < reflect.TypeOf(ConnStats{}).NumField(); i++ {
- n := reflect.ValueOf(me).Elem().Field(i).Addr().Interface().(*Count).Int64()
- reflect.ValueOf(&ret).Elem().Field(i).Addr().Interface().(*Count).Add(n)
- }
- return
-}
-
-type Count struct {
- n int64
-}
-
-var _ fmt.Stringer = (*Count)(nil)
-
-func (me *Count) Add(n int64) {
- atomic.AddInt64(&me.n, n)
-}
-
-func (me *Count) Int64() int64 {
- return atomic.LoadInt64(&me.n)
-}
-
-func (me *Count) String() string {
- return fmt.Sprintf("%v", me.Int64())
-}
-
-func (me *Count) MarshalJSON() ([]byte, error) {
- return json.Marshal(me.n)
+ return copyCountFields(me)
}
func (cs *ConnStats) wroteMsg(msg *pp.Message) {
package torrent
import (
+ "github.com/anacrolix/torrent/metainfo"
+ "github.com/anacrolix/torrent/storage"
"testing"
- "github.com/anacrolix/log"
"github.com/stretchr/testify/require"
"github.com/anacrolix/torrent/internal/testutil"
- "github.com/anacrolix/torrent/storage"
)
func TestHashPieceAfterStorageClosed(t *testing.T) {
+ cl := newTestingClient(t)
td := t.TempDir()
cs := storage.NewFile(td)
defer cs.Close()
- tt := &Torrent{
- storageOpener: storage.NewClient(cs),
- logger: log.Default,
- chunkSize: defaultChunkSize,
- }
- tt.infoHash.Ok = true
- tt.infoHash.Value[0] = 1
+ tt := cl.newTorrent(metainfo.Hash{1}, cs)
mi := testutil.GreetingMetaInfo()
info, err := mi.UnmarshalInfo()
require.NoError(t, err)
// connection.
func (cn *Peer) postHandshakeStats(f func(*ConnStats)) {
t := cn.t
- f(&t.stats)
+ f(&t.connStats)
f(&t.cl.connStats)
}
package torrent
+import (
+ "reflect"
+)
+
// Due to ConnStats, may require special alignment on some platforms. See
// https://github.com/anacrolix/torrent/issues/383.
type TorrentStats struct {
// Aggregates stats over all connections past and present. Some values may not have much meaning
// in the aggregate context.
ConnStats
+ TorrentStatCounters
+ TorrentGauges
+}
+// Instantaneous metrics in Torrents, and aggregated for Clients.
+type TorrentGauges struct {
// Ordered by expected descending quantities (if all is well).
TotalPeers int
PendingPeers int
HalfOpenPeers int
PiecesComplete int
}
+
+func (me *TorrentGauges) Add(agg TorrentGauges) {
+ src := reflect.ValueOf(agg)
+ dst := reflect.ValueOf(me).Elem()
+ for i := 0; i < reflect.TypeFor[TorrentGauges]().NumField(); i++ {
+ *dst.Field(i).Addr().Interface().(*int) += src.Field(i).Interface().(int)
+ }
+ return
+}
+
+type TorrentStatCounters struct {
+ BytesHashed Count
+}
type Torrent struct {
// Torrent-level aggregate statistics. First in struct to ensure 64-bit
// alignment. See #262.
- stats ConnStats
+ connStats ConnStats
+ counters TorrentStatCounters
+
cl *Client
logger log.Logger
}
}
+func (t *Torrent) countBytesHashed(n int64) {
+ t.counters.BytesHashed.Add(n)
+ t.cl.counters.BytesHashed.Add(n)
+}
+
func (t *Torrent) hashPiece(piece pieceIndex) (
correct bool,
- // These are peers that sent us blocks that differ from what we hash here.
+// These are peers that sent us blocks that differ from what we hash here.
differingPeers map[bannableAddr]struct{},
err error,
) {
var sum metainfo.Hash
// log.Printf("A piece decided to self-hash: %d", piece)
sum, err = i.SelfHash()
+ if err == nil {
+ t.countBytesHashed(int64(p.length()))
+ }
correct = sum == *p.hash
// Can't do smart banning without reading the piece. The smartBanCache is still cleared
// in pieceHasher regardless.
err,
))
}
+ t.countBytesHashed(written)
}
var sum [20]byte
sumExactly(sum[:], h.Sum)
}
func (t *Torrent) hashPieceWithSpecificHash(piece pieceIndex, h hash.Hash) (
- // These are peers that sent us blocks that differ from what we hash here.
+// These are peers that sent us blocks that differ from what we hash here.
differingPeers map[bannableAddr]struct{},
err error,
) {
// ban peers for all recorded blocks that weren't just written.
return
}
+ t.countBytesHashed(written)
}
// Flush before writing padding, since we would not have recorded the padding blocks.
smartBanWriter.Flush()
}
func (t *Torrent) maybeDropMutuallyCompletePeer(
- // I'm not sure about taking peer here, not all peer implementations actually drop. Maybe that's
- // okay?
+// I'm not sure about taking peer here, not all peer implementations actually drop. Maybe that's
+// okay?
p *PeerConn,
) {
if !t.cl.config.DropMutuallyCompletePeers {
// The following are vaguely described in BEP 3.
Left: t.bytesLeftAnnounce(),
- Uploaded: t.stats.BytesWrittenData.Int64(),
+ Uploaded: t.connStats.BytesWrittenData.Int64(),
// There's no mention of wasted or unwanted download in the BEP.
- Downloaded: t.stats.BytesReadUsefulData.Int64(),
+ Downloaded: t.connStats.BytesReadUsefulData.Int64(),
}
}
return t.statsLocked()
}
-func (t *Torrent) statsLocked() (ret TorrentStats) {
+func (t *Torrent) gauges() (ret TorrentGauges) {
ret.ActivePeers = len(t.conns)
ret.HalfOpenPeers = len(t.halfOpen)
ret.PendingPeers = t.peers.Len()
ret.ConnectedSeeders++
}
}
- ret.ConnStats = t.stats.Copy()
ret.PiecesComplete = t.numPiecesCompleted()
return
}
+func (t *Torrent) statsLocked() (ret TorrentStats) {
+ ret.ConnStats = copyCountFields(&t.connStats)
+ ret.TorrentStatCounters = copyCountFields(&t.counters)
+ ret.TorrentGauges = t.gauges()
+ return
+}
+
// The total number of peers in the torrent.
func (t *Torrent) numTotalPeers() int {
peers := make(map[string]struct{})
// All stats that include this Torrent. Useful when we want to increment ConnStats but not for every
// connection.
func (t *Torrent) allStats(f func(*ConnStats)) {
- f(&t.stats)
+ f(&t.connStats)
f(&t.cl.connStats)
}