// TODO: Can this use more generics to speed it up? Should we be checking the field types?
func copyCountFields[T any](src *T) (dst T) {
+ srcValue := reflect.ValueOf(src).Elem()
+ dstValue := reflect.ValueOf(&dst).Elem()
for i := 0; i < reflect.TypeFor[T]().NumField(); i++ {
- n := reflect.ValueOf(src).Elem().Field(i).Addr().Interface().(*Count).Int64()
- reflect.ValueOf(&dst).Elem().Field(i).Addr().Interface().(*Count).Add(n)
+ n := srcValue.Field(i).Addr().Interface().(*Count).Int64()
+ dstValue.Field(i).Addr().Interface().(*Count).Add(n)
}
return
}
}
func (cl *Client) statsLocked() (stats ClientStats) {
- stats.ConnStats = copyCountFields(&cl.connStats)
+ stats.AllConnStats = cl.connStats.Copy()
stats.TorrentStatCounters = copyCountFields(&cl.counters)
for t := range cl.torrents {
stats.TorrentGauges.Add(t.gauges())
type Client struct {
// An aggregate of stats over all connections. First in struct to ensure 64-bit alignment of
// fields. See #262.
- connStats ConnStats
+ connStats AllConnStats
counters TorrentStatCounters
_mu lockWithDeferreds
piece := d.Piece
switch d.Type {
case pp.DataMetadataExtensionMsgType:
- c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
+ c.modifyRelevantConnStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
if !c.requestedMetadataPiece(piece) {
return fmt.Errorf("got unexpected piece %d", piece)
}
}
c = &PeerConn{
Peer: Peer{
+ cl: cl,
outgoing: opts.outgoing,
choking: true,
peerChoking: true,
}
// Returns connection-level aggregate connStats at the Client level. See the comment on
-// TorrentStats.ConnStats.
+// TorrentStats.ConnStats. You probably want Client.Stats() instead.
func (cl *Client) ConnStats() ConnStats {
- return cl.connStats.Copy()
+ return cl.connStats.ConnStats.Copy()
}
func (cl *Client) Stats() ClientStats {
cs.BytesReadData.Add(size)
}
-func (cs *ConnStats) incrementPiecesDirtiedGood() {
+func (cs *ConnStats) incrementPiecesDirtiedGood() bool {
cs.PiecesDirtiedGood.Add(1)
+ // This method is used as an iterator and should never return early.
+ return true
}
-func (cs *ConnStats) incrementPiecesDirtiedBad() {
+func (cs *ConnStats) incrementPiecesDirtiedBad() bool {
cs.PiecesDirtiedBad.Add(1)
+ // This method is used as an iterator and should never return early.
+ return true
}
func add(n int64, f func(*ConnStats) *Count) func(*ConnStats) {
--- /dev/null
+package torrent
"strings"
"testing"
- "github.com/anacrolix/missinggo/iter"
- "github.com/anacrolix/missinggo/v2/bitmap"
"github.com/davecgh/go-spew/spew"
"github.com/stretchr/testify/assert"
)
check(13, 5, 13, Request{}, false)
}
-func BenchmarkIterBitmapsDistinct(t *testing.B) {
- t.ReportAllocs()
- for i := 0; i < t.N; i += 1 {
- var skip, first, second bitmap.Bitmap
- skip.Add(1)
- first.Add(1, 0, 3)
- second.Add(1, 2, 0)
- skipCopy := skip.Copy()
- t.StartTimer()
- output := iter.ToSlice(iterBitmapsDistinct(&skipCopy, first, second))
- t.StopTimer()
- assert.Equal(t, []interface{}{0, 3, 2}, output)
- assert.Equal(t, []bitmap.BitIndex{1}, skip.ToSortedSlice())
- }
-}
-
func TestSpewConnStats(t *testing.T) {
s := spew.Sdump(ConnStats{})
t.Logf("\n%s", s)
// Whether we're expecting to receive chunks because we have outstanding requests. Used for
// example to calculate download rate.
expectingChunks() bool
+ allConnStatsImplField(*AllConnStats) *ConnStats
}
"context"
"fmt"
"io"
+ "iter"
"log/slog"
"net"
"strings"
"github.com/anacrolix/chansync"
. "github.com/anacrolix/generics"
"github.com/anacrolix/log"
- "github.com/anacrolix/missinggo/iter"
"github.com/anacrolix/missinggo/v2/bitmap"
"github.com/anacrolix/missinggo/v2/panicif"
"github.com/anacrolix/multiless"
-
"github.com/anacrolix/torrent/internal/alloclim"
"github.com/anacrolix/torrent/mse"
pp "github.com/anacrolix/torrent/peer_protocol"
// First to ensure 64-bit alignment for atomics. See #262.
_stats ConnStats
- t *Torrent
+ cl *Client
+ t *Torrent
legacyPeerImpl
peerImpl newHotPeerImpl
// True if the connection is operating over MSE obfuscation.
headerEncrypted bool
cryptoMethod mse.CryptoMethod
- // Set true after we've added our ConnStats generated during handshake to
- // other ConnStat instances as determined when the *Torrent became known.
- reconciledHandshakeStats bool
lastMessageReceived time.Time
completedHandshake time.Time
// are okay.
type messageWriter func(pp.Message) bool
-// Emits the indices in the Bitmaps bms in order, never repeating any index.
-// skip is mutated during execution, and its initial values will never be
-// emitted.
-func iterBitmapsDistinct(skip *bitmap.Bitmap, bms ...bitmap.Bitmap) iter.Func {
- return func(cb iter.Callback) {
- for _, bm := range bms {
- if !iter.All(
- func(_i interface{}) bool {
- i := _i.(int)
- if skip.Contains(bitmap.BitIndex(i)) {
- return true
- }
- skip.Add(bitmap.BitIndex(i))
- return cb(i)
- },
- bm.Iter,
- ) {
- return
- }
- }
+// All ConnStats that include this connection. Some objects are not known until the handshake is
+// complete, after which it's expected to reconcile the differences.
+func (cn *Peer) modifyRelevantConnStats(f func(*ConnStats)) {
+ // Every peer has basic ConnStats for now.
+ f(&cn._stats)
+ incAll := func(stats *ConnStats) bool {
+ f(stats)
+ return true
}
+ cn.upstreamConnStats()(incAll)
}
-// After handshake, we know what Torrent and Client stats to include for a
-// connection.
-func (cn *Peer) postHandshakeStats(f func(*ConnStats)) {
- t := cn.t
- f(&t.connStats)
- f(&t.cl.connStats)
-}
-
-// All ConnStats that include this connection. Some objects are not known
-// until the handshake is complete, after which it's expected to reconcile the
-// differences.
-func (cn *Peer) allStats(f func(*ConnStats)) {
- f(&cn._stats)
- if cn.reconciledHandshakeStats {
- cn.postHandshakeStats(f)
+// Yields relevant upstream ConnStats. Skips Torrent if it isn't set.
+func (cn *Peer) upstreamConnStats() iter.Seq[*ConnStats] {
+ return func(yield func(*ConnStats) bool) {
+ // PeerConn can be nil when it hasn't completed handshake.
+ if cn.t != nil {
+ cn.relevantConnStats(&cn.t.connStats)(yield)
+ }
+ cn.relevantConnStats(&cn.cl.connStats)(yield)
}
}
func (cn *Peer) readBytes(n int64) {
- cn.allStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesRead }))
+ cn.modifyRelevantConnStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesRead }))
}
func (c *Peer) lastHelpful() (ret time.Time) {
}
func (c *Peer) doChunkReadStats(size int64) {
- c.allStats(func(cs *ConnStats) { cs.receivedChunk(size) })
+ c.modifyRelevantConnStats(func(cs *ConnStats) { cs.receivedChunk(size) })
}
// Handle a received chunk from a peer. TODO: Break this out into non-wire protocol specific
if t.haveChunk(ppReq) {
// panic(fmt.Sprintf("%+v", ppReq))
ChunksReceived.Add("redundant", 1)
- c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadWasted }))
+ c.modifyRelevantConnStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadWasted }))
return nil
}
piece := t.piece(ppReq.Index.Int())
- c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful }))
- c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData }))
+ c.modifyRelevantConnStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful }))
+ c.modifyRelevantConnStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData }))
if intended {
- c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulIntendedData }))
+ c.modifyRelevantConnStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulIntendedData }))
}
for _, f := range c.t.cl.config.Callbacks.ReceivedUsefulData {
f(ReceivedUsefulDataEvent{c, msg})
return ret
}
-func (cn *Peer) stats() *ConnStats {
- return &cn._stats
-}
-
func (p *Peer) TryAsPeerConn() (*PeerConn, bool) {
pc, ok := p.legacyPeerImpl.(*PeerConn)
return pc, ok
panicif.NotNil(p.closedCtx)
p.closedCtx, p.closedCtxCancel = context.WithCancel(p.t.closedCtx)
}
+
+// Iterates base and peer-impl specific ConnStats from all.
+func (p *Peer) relevantConnStats(all *AllConnStats) iter.Seq[*ConnStats] {
+ return func(yield func(*ConnStats) bool) {
+ yield(&all.ConnStats)
+ yield(p.peerImpl.allConnStatsImplField(all))
+ }
+}
receivedHashPieces map[[32]byte][][32]byte
peerRequestServerRunning bool
+ // Set true after we've added our ConnStats generated during handshake to other ConnStat
+ // instances as determined when the *Torrent became known.
+ reconciledHandshakeStats bool
+}
+
+func (*PeerConn) allConnStatsImplField(stats *AllConnStats) *ConnStats {
+ return &stats.PeerConns
}
func (cn *PeerConn) lastWriteUploadRate() float64 {
torrent.Add(fmt.Sprintf("Extended messages written for protocol %q", name), 1)
}
}
- cn.allStats(func(cs *ConnStats) { cs.wroteMsg(msg) })
+ cn.modifyRelevantConnStats(func(cs *ConnStats) { cs.wroteMsg(msg) })
}
func (cn *PeerConn) wroteBytes(n int64) {
- cn.allStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesWritten }))
+ cn.modifyRelevantConnStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesWritten }))
}
func (c *PeerConn) fastEnabled() bool {
c.initClosedCtx()
c.logger.WithDefaultLevel(log.Debug).Printf("set torrent=%v", t)
c.setPeerLoggers(t.logger, t.slogger())
- t.reconcileHandshakeStats(c.peerPtr())
+ c.reconcileHandshakeStats()
}
func (c *PeerConn) pexPeerFlags() pp.PexPeerFlags {
return
}
+
+// Reconcile bytes transferred before connection was associated with a torrent.
+func (c *PeerConn) reconcileHandshakeStats() {
+ panicif.True(c.reconciledHandshakeStats)
+ if c._stats != (ConnStats{
+ // Handshakes should only increment these fields:
+ BytesWritten: c._stats.BytesWritten,
+ BytesRead: c._stats.BytesRead,
+ }) {
+ panic("bad stats")
+ }
+ // Add the stat data so far to relevant Torrent stats that were skipped before the handshake
+ // completed.
+ c.relevantConnStats(&c.t.connStats)(func(cs *ConnStats) bool {
+ cs.BytesRead.Add(c._stats.BytesRead.Int64())
+ cs.BytesWritten.Add(c._stats.BytesWritten.Int64())
+ return true
+ })
+ c.reconciledHandshakeStats = true
+}
// Due to ConnStats, may require special alignment on some platforms. See
// https://github.com/anacrolix/torrent/issues/383.
type TorrentStats struct {
+ AllConnStats
+ TorrentStatCounters
+ TorrentGauges
+}
+
+type AllConnStats struct {
// Aggregates stats over all connections past and present. Some values may not have much meaning
// in the aggregate context.
ConnStats
- TorrentStatCounters
- TorrentGauges
+ WebSeeds ConnStats
+ PeerConns ConnStats
+}
+
+func (me *AllConnStats) Copy() (ret AllConnStats) {
+ ret.ConnStats = me.ConnStats.Copy()
+ ret.WebSeeds = me.WebSeeds.Copy()
+ ret.PeerConns = me.PeerConns.Copy()
+ return
}
// Instantaneous metrics in Torrents, and aggregated for Clients.
type Torrent struct {
// Torrent-level aggregate statistics. First in struct to ensure 64-bit
// alignment. See #262.
- connStats ConnStats
+ connStats AllConnStats
counters TorrentStatCounters
cl *Client
}
func (t *Torrent) statsLocked() (ret TorrentStats) {
- ret.ConnStats = copyCountFields(&t.connStats)
+ ret.AllConnStats = t.connStats.Copy()
ret.TorrentStatCounters = copyCountFields(&t.counters)
ret.TorrentGauges = t.gauges()
return
return len(peers)
}
-// Reconcile bytes transferred before connection was associated with a
-// torrent.
-func (t *Torrent) reconcileHandshakeStats(c *Peer) {
- if c._stats != (ConnStats{
- // Handshakes should only increment these fields:
- BytesWritten: c._stats.BytesWritten,
- BytesRead: c._stats.BytesRead,
- }) {
- panic("bad stats")
- }
- c.postHandshakeStats(func(cs *ConnStats) {
- cs.BytesRead.Add(c._stats.BytesRead.Int64())
- cs.BytesWritten.Add(c._stats.BytesWritten.Int64())
- })
- c.reconciledHandshakeStats = true
-}
-
// Returns true if the connection is added.
func (t *Torrent) addPeerConn(c *PeerConn) (err error) {
defer func() {
}()
if passed {
- if len(p.dirtiers) != 0 {
- // Don't increment stats above connection-level for every involved connection.
- t.allStats((*ConnStats).incrementPiecesDirtiedGood)
- }
- for c := range p.dirtiers {
- c._stats.incrementPiecesDirtiedGood()
- }
+ t.incrementPiecesDirtiedStats(p, (*ConnStats).incrementPiecesDirtiedGood)
t.clearPieceTouchers(piece)
hasDirty := p.hasDirtyChunks()
t.cl.unlock()
if len(p.dirtiers) != 0 && p.allChunksDirty() && hashIoErr == nil {
// Peers contributed to all the data for this piece hash failure, and the failure was
// not due to errors in the storage (such as data being dropped in a cache).
-
- // Increment Torrent and above stats, and then specific connections.
- t.allStats((*ConnStats).incrementPiecesDirtiedBad)
- for c := range p.dirtiers {
- // Y u do dis peer?!
- c.stats().incrementPiecesDirtiedBad()
- }
+ t.incrementPiecesDirtiedStats(p, (*ConnStats).incrementPiecesDirtiedBad)
bannableTouchers := make([]*Peer, 0, len(p.dirtiers))
for c := range p.dirtiers {
}())
}
-// All stats that include this Torrent. Useful when we want to increment ConnStats but not for every
-// connection.
-func (t *Torrent) allStats(f func(*ConnStats)) {
- f(&t.connStats)
- f(&t.cl.connStats)
-}
-
func (t *Torrent) hashingPiece(i pieceIndex) bool {
return t.pieces[i].hashing
}
const defaultMaxRequests = 2
ws := webseedPeer{
peer: Peer{
- t: t,
- outgoing: true,
- Network: "http",
- reconciledHandshakeStats: true,
+ cl: t.cl,
+ t: t,
+ outgoing: true,
+ Network: "http",
// TODO: Set ban prefix?
RemoteAddr: remoteAddrFromUrl(url),
callbacks: t.callbacks(),
}
return false
}
+
+// Increment pieces dirtied for conns and aggregate upstreams.
+func (t *Torrent) incrementPiecesDirtiedStats(p *Piece, inc func(stats *ConnStats) bool) {
+ if len(p.dirtiers) == 0 {
+ // Avoid allocating map.
+ return
+ }
+ // 4 == 2 peerImpls (PeerConn and webseedPeer) and 1 base * one AllConnStats for each of Torrent
+ // and Client.
+ distinctUpstreamConnStats := make(map[*ConnStats]struct{}, 6)
+ for c := range p.dirtiers {
+ // Apply directly for each peer to avoid allocation.
+ inc(&c._stats)
+ // Collect distinct upstream connection stats.
+ count := 0
+ for cs := range c.upstreamConnStats() {
+ distinctUpstreamConnStats[cs] = struct{}{}
+ count++
+ }
+ // All dirtiers should have both Torrent and Client stats for both base and impl-ConnStats.
+ panicif.NotEq(count, 4)
+ }
+ // TODO: Have a debug assert/dev logging version of this.
+ panicif.GreaterThan(len(distinctUpstreamConnStats), 6)
+ maps.Keys(distinctUpstreamConnStats)(inc)
+}
hostKey webseedHostKeyHandle
}
+func (*webseedPeer) allConnStatsImplField(stats *AllConnStats) *ConnStats {
+ return &stats.WebSeeds
+}
+
func (me *webseedPeer) cancelAllRequests() {
// Is there any point to this? Won't we fail to receive a chunk and cancel anyway? Should we
// Close requests instead?