ConnStats management is refactored to make this less tedious.
"github.com/anacrolix/missinggo/pubsub"
"github.com/anacrolix/missinggo/slices"
"github.com/anacrolix/sync"
+ "github.com/davecgh/go-spew/spew"
"github.com/dustin/go-humanize"
"github.com/google/btree"
"golang.org/x/time/rate"
dopplegangerAddrs map[string]struct{}
badPeerIPs map[string]struct{}
torrents map[metainfo.Hash]*Torrent
+ // An aggregate of stats over all connections.
+ stats ConnStats
}
func (cl *Client) BadPeerIPs() []string {
fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
writeDhtServerStatus(w, s)
})
+ spew.Fdump(w, cl.stats)
fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
fmt.Fprintln(w)
for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
}
func (cl *Client) runHandshookConn(c *connection, t *Torrent, outgoing bool) {
- t.reconcileHandshakeStats(c)
+ c.setTorrent(t)
if c.PeerID == cl.peerID {
if outgoing {
connsToSelf.Add(1)
return fmt.Errorf("data has bad offset in payload: %d", begin)
}
t.saveMetadataPiece(piece, payload[begin:])
- c.stats.ChunksReadUseful++
- c.t.stats.ChunksReadUseful++
+ c.allStats(add(1, func(cs *ConnStats) *int64 { return &cs.ChunksReadUseful }))
c.lastUsefulChunkReceived = time.Now()
return t.maybeCompleteMetadata()
case pp.RequestMetadataExtensionMsgType:
}
}
-func (cs *ConnStats) wroteBytes(n int64) {
- cs.BytesWritten += n
+func (cs *ConnStats) incrementPiecesDirtiedGood() {
+ cs.PiecesDirtiedGood++
}
-func (cs *ConnStats) readBytes(n int64) {
- cs.BytesRead += n
+func (cs *ConnStats) incrementPiecesDirtiedBad() {
+ cs.PiecesDirtiedBad++
+}
+
+func add(n int64, f func(*ConnStats) *int64) func(*ConnStats) {
+ return func(cs *ConnStats) {
+ p := f(cs)
+ *p += n
+ }
}
type connStatsReadWriter struct {
cryptoMethod mse.CryptoMethod
Discovery peerSource
closed missinggo.Event
+ // Set true after we've added our ConnStats generated during handshake to
+ // other ConnStat instances as determined when the *Torrent became known.
+ reconciledHandshakeStats bool
stats ConnStats
func (cn *connection) wroteMsg(msg *pp.Message) {
messageTypesSent.Add(msg.Type.String(), 1)
- cn.stats.wroteMsg(msg)
- cn.t.stats.wroteMsg(msg)
+ cn.allStats(func(cs *ConnStats) { cs.wroteMsg(msg) })
}
func (cn *connection) readMsg(msg *pp.Message) {
- cn.stats.readMsg(msg)
- cn.t.stats.readMsg(msg)
+ cn.allStats(func(cs *ConnStats) { cs.readMsg(msg) })
}
-func (cn *connection) wroteBytes(n int64) {
- cn.stats.wroteBytes(n)
- if cn.t != nil {
- cn.t.stats.wroteBytes(n)
+// After handshake, we know what Torrent and Client stats to include for a
+// connection.
+func (cn *connection) postHandshakeStats(f func(*ConnStats)) {
+ t := cn.t
+ f(&t.stats.ConnStats)
+ f(&t.cl.stats)
+}
+
+// All ConnStats that include this connection. Some objects are not known
+// until the handshake is complete, after which it's expected to reconcile the
+// differences.
+func (cn *connection) allStats(f func(*ConnStats)) {
+ f(&cn.stats)
+ if cn.reconciledHandshakeStats {
+ cn.postHandshakeStats(f)
}
}
+func (cn *connection) wroteBytes(n int64) {
+ cn.allStats(add(n, func(cs *ConnStats) *int64 { return &cs.BytesWritten }))
+}
+
func (cn *connection) readBytes(n int64) {
- cn.stats.readBytes(n)
- if cn.t != nil {
- cn.t.stats.readBytes(n)
- }
+ cn.allStats(add(n, func(cs *ConnStats) *int64 { return &cs.BytesRead }))
}
// Returns whether the connection could be useful to us. We're seeding and
// Do we actually want this chunk?
if !t.wantPiece(req) {
unwantedChunksReceived.Add(1)
- c.stats.ChunksReadUnwanted++
- c.t.stats.ChunksReadUnwanted++
+ c.allStats(add(1, func(cs *ConnStats) *int64 { return &cs.ChunksReadUnwanted }))
return
}
index := int(req.Index)
piece := &t.pieces[index]
- c.stats.ChunksReadUseful++
- c.t.stats.ChunksReadUseful++
- c.stats.BytesReadUsefulData += int64(len(msg.Piece))
- c.t.stats.BytesReadUsefulData += int64(len(msg.Piece))
+ c.allStats(add(1, func(cs *ConnStats) *int64 { return &cs.ChunksReadUseful }))
+ c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *int64 { return &cs.BytesReadUsefulData }))
c.lastUsefulChunkReceived = time.Now()
// if t.fastestConn != c {
// log.Printf("setting fastest connection %p", c)
panic("connection already associated with a torrent")
}
c.t = t
- t.conns[c] = struct{}{}
+ t.reconcileHandshakeStats(c)
}
// Reconcile bytes transferred before connection was associated with a
// torrent.
func (t *Torrent) reconcileHandshakeStats(c *connection) {
- t.stats.wroteBytes(c.stats.BytesWritten)
- t.stats.readBytes(c.stats.BytesRead)
+ if c.stats != (ConnStats{
+ // Handshakes should only increment these fields:
+ BytesWritten: c.stats.BytesWritten,
+ BytesRead: c.stats.BytesRead,
+ }) {
+ panic("bad stats")
+ }
+ c.postHandshakeStats(func(cs *ConnStats) {
+ cs.BytesRead += c.stats.BytesRead
+ cs.BytesWritten += c.stats.BytesWritten
+ })
+ c.reconciledHandshakeStats = true
}
// Returns true if the connection is added.
if len(t.conns) >= t.maxEstablishedConns {
panic(len(t.conns))
}
- c.setTorrent(t)
+ t.conns[c] = struct{}{}
return true
}
}
if correct {
if len(touchers) != 0 {
- t.stats.PiecesDirtiedGood++
+ // Don't increment stats above connection-level for every involved
+ // connection.
+ t.allStats((*ConnStats).incrementPiecesDirtiedGood)
}
for _, c := range touchers {
- c.stats.PiecesDirtiedGood++
+ c.stats.incrementPiecesDirtiedGood()
}
err := p.Storage().MarkComplete()
if err != nil {
}
} else {
if len(touchers) != 0 {
- t.stats.PiecesDirtiedBad++
+ // Don't increment stats above connection-level for every involved
+ // connection.
+ t.allStats((*ConnStats).incrementPiecesDirtiedBad)
for _, c := range touchers {
// Y u do dis peer?!
- c.stats.PiecesDirtiedBad++
+ c.stats.incrementPiecesDirtiedBad()
}
slices.Sort(touchers, connLessTrusted)
if t.cl.config.Debug {
return
}())
}
+
+// All stats that include this Torrent. Useful when we want to increment
+// ConnStats but not for every connection.
+func (t *Torrent) allStats(f func(*ConnStats)) {
+ f(&t.stats.ConnStats)
+ f(&t.cl.stats)
+}