]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Add ConnStats at Client level
authorMatt Joiner <anacrolix@gmail.com>
Sat, 9 Jun 2018 23:18:52 +0000 (09:18 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Sat, 9 Jun 2018 23:20:33 +0000 (09:20 +1000)
ConnStats management is refactored to make this less tedious.

client.go
conn_stats.go
connection.go
torrent.go

index f5a82300de051abc625a38b3864b77efb6f99147..2bcd81f832500059876ca6d14a52fc66a054f764 100644 (file)
--- a/client.go
+++ b/client.go
@@ -22,6 +22,7 @@ import (
        "github.com/anacrolix/missinggo/pubsub"
        "github.com/anacrolix/missinggo/slices"
        "github.com/anacrolix/sync"
+       "github.com/davecgh/go-spew/spew"
        "github.com/dustin/go-humanize"
        "github.com/google/btree"
        "golang.org/x/time/rate"
@@ -62,6 +63,8 @@ type Client struct {
        dopplegangerAddrs map[string]struct{}
        badPeerIPs        map[string]struct{}
        torrents          map[metainfo.Hash]*Torrent
+       // An aggregate of stats over all connections.
+       stats ConnStats
 }
 
 func (cl *Client) BadPeerIPs() []string {
@@ -123,6 +126,7 @@ func (cl *Client) WriteStatus(_w io.Writer) {
                fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
                writeDhtServerStatus(w, s)
        })
+       spew.Fdump(w, cl.stats)
        fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
        fmt.Fprintln(w)
        for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
@@ -764,7 +768,7 @@ func (cl *Client) runReceivedConn(c *connection) {
 }
 
 func (cl *Client) runHandshookConn(c *connection, t *Torrent, outgoing bool) {
-       t.reconcileHandshakeStats(c)
+       c.setTorrent(t)
        if c.PeerID == cl.peerID {
                if outgoing {
                        connsToSelf.Add(1)
@@ -899,8 +903,7 @@ func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connect
                        return fmt.Errorf("data has bad offset in payload: %d", begin)
                }
                t.saveMetadataPiece(piece, payload[begin:])
-               c.stats.ChunksReadUseful++
-               c.t.stats.ChunksReadUseful++
+               c.allStats(add(1, func(cs *ConnStats) *int64 { return &cs.ChunksReadUseful }))
                c.lastUsefulChunkReceived = time.Now()
                return t.maybeCompleteMetadata()
        case pp.RequestMetadataExtensionMsgType:
index 9e01104bf14b9a97a64c75cdb07d8dd789d3975b..1f45fa10320ad9d7c941ab596e7c78d9bac32445 100644 (file)
@@ -52,12 +52,19 @@ func (cs *ConnStats) readMsg(msg *pp.Message) {
        }
 }
 
-func (cs *ConnStats) wroteBytes(n int64) {
-       cs.BytesWritten += n
+func (cs *ConnStats) incrementPiecesDirtiedGood() {
+       cs.PiecesDirtiedGood++
 }
 
-func (cs *ConnStats) readBytes(n int64) {
-       cs.BytesRead += n
+func (cs *ConnStats) incrementPiecesDirtiedBad() {
+       cs.PiecesDirtiedBad++
+}
+
+func add(n int64, f func(*ConnStats) *int64) func(*ConnStats) {
+       return func(cs *ConnStats) {
+               p := f(cs)
+               *p += n
+       }
 }
 
 type connStatsReadWriter struct {
index 9246072cc49b9b6bbab9709d61867e7ecb85d7b8..e330bd297ee0591925079af4860fa5e5e4f2e193 100644 (file)
@@ -50,6 +50,9 @@ type connection struct {
        cryptoMethod    mse.CryptoMethod
        Discovery       peerSource
        closed          missinggo.Event
+       // Set true after we've added our ConnStats generated during handshake to
+       // other ConnStat instances as determined when the *Torrent became known.
+       reconciledHandshakeStats bool
 
        stats ConnStats
 
@@ -839,27 +842,37 @@ func (c *connection) requestPendingMetadata() {
 
 func (cn *connection) wroteMsg(msg *pp.Message) {
        messageTypesSent.Add(msg.Type.String(), 1)
-       cn.stats.wroteMsg(msg)
-       cn.t.stats.wroteMsg(msg)
+       cn.allStats(func(cs *ConnStats) { cs.wroteMsg(msg) })
 }
 
 func (cn *connection) readMsg(msg *pp.Message) {
-       cn.stats.readMsg(msg)
-       cn.t.stats.readMsg(msg)
+       cn.allStats(func(cs *ConnStats) { cs.readMsg(msg) })
 }
 
-func (cn *connection) wroteBytes(n int64) {
-       cn.stats.wroteBytes(n)
-       if cn.t != nil {
-               cn.t.stats.wroteBytes(n)
+// After handshake, we know what Torrent and Client stats to include for a
+// connection.
+func (cn *connection) postHandshakeStats(f func(*ConnStats)) {
+       t := cn.t
+       f(&t.stats.ConnStats)
+       f(&t.cl.stats)
+}
+
+// All ConnStats that include this connection. Some objects are not known
+// until the handshake is complete, after which it's expected to reconcile the
+// differences.
+func (cn *connection) allStats(f func(*ConnStats)) {
+       f(&cn.stats)
+       if cn.reconciledHandshakeStats {
+               cn.postHandshakeStats(f)
        }
 }
 
+func (cn *connection) wroteBytes(n int64) {
+       cn.allStats(add(n, func(cs *ConnStats) *int64 { return &cs.BytesWritten }))
+}
+
 func (cn *connection) readBytes(n int64) {
-       cn.stats.readBytes(n)
-       if cn.t != nil {
-               cn.t.stats.readBytes(n)
-       }
+       cn.allStats(add(n, func(cs *ConnStats) *int64 { return &cs.BytesRead }))
 }
 
 // Returns whether the connection could be useful to us. We're seeding and
@@ -1185,18 +1198,15 @@ func (c *connection) receiveChunk(msg *pp.Message) {
        // Do we actually want this chunk?
        if !t.wantPiece(req) {
                unwantedChunksReceived.Add(1)
-               c.stats.ChunksReadUnwanted++
-               c.t.stats.ChunksReadUnwanted++
+               c.allStats(add(1, func(cs *ConnStats) *int64 { return &cs.ChunksReadUnwanted }))
                return
        }
 
        index := int(req.Index)
        piece := &t.pieces[index]
 
-       c.stats.ChunksReadUseful++
-       c.t.stats.ChunksReadUseful++
-       c.stats.BytesReadUsefulData += int64(len(msg.Piece))
-       c.t.stats.BytesReadUsefulData += int64(len(msg.Piece))
+       c.allStats(add(1, func(cs *ConnStats) *int64 { return &cs.ChunksReadUseful }))
+       c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *int64 { return &cs.BytesReadUsefulData }))
        c.lastUsefulChunkReceived = time.Now()
        // if t.fastestConn != c {
        // log.Printf("setting fastest connection %p", c)
@@ -1415,5 +1425,5 @@ func (c *connection) setTorrent(t *Torrent) {
                panic("connection already associated with a torrent")
        }
        c.t = t
-       t.conns[c] = struct{}{}
+       t.reconcileHandshakeStats(c)
 }
index 4a784300bd99c6be128d88d193a88e9ec2c969a4..b8b4282b100f2fb9f7d82fbaea2d2f434d0ded2b 100644 (file)
@@ -1474,8 +1474,18 @@ func (t *Torrent) numTotalPeers() int {
 // Reconcile bytes transferred before connection was associated with a
 // torrent.
 func (t *Torrent) reconcileHandshakeStats(c *connection) {
-       t.stats.wroteBytes(c.stats.BytesWritten)
-       t.stats.readBytes(c.stats.BytesRead)
+       if c.stats != (ConnStats{
+               // Handshakes should only increment these fields:
+               BytesWritten: c.stats.BytesWritten,
+               BytesRead:    c.stats.BytesRead,
+       }) {
+               panic("bad stats")
+       }
+       c.postHandshakeStats(func(cs *ConnStats) {
+               cs.BytesRead += c.stats.BytesRead
+               cs.BytesWritten += c.stats.BytesWritten
+       })
+       c.reconciledHandshakeStats = true
 }
 
 // Returns true if the connection is added.
@@ -1519,7 +1529,7 @@ func (t *Torrent) addConnection(c *connection, outgoing bool) bool {
        if len(t.conns) >= t.maxEstablishedConns {
                panic(len(t.conns))
        }
-       c.setTorrent(t)
+       t.conns[c] = struct{}{}
        return true
 }
 
@@ -1575,10 +1585,12 @@ func (t *Torrent) pieceHashed(piece int, correct bool) {
        }
        if correct {
                if len(touchers) != 0 {
-                       t.stats.PiecesDirtiedGood++
+                       // Don't increment stats above connection-level for every involved
+                       // connection.
+                       t.allStats((*ConnStats).incrementPiecesDirtiedGood)
                }
                for _, c := range touchers {
-                       c.stats.PiecesDirtiedGood++
+                       c.stats.incrementPiecesDirtiedGood()
                }
                err := p.Storage().MarkComplete()
                if err != nil {
@@ -1586,10 +1598,12 @@ func (t *Torrent) pieceHashed(piece int, correct bool) {
                }
        } else {
                if len(touchers) != 0 {
-                       t.stats.PiecesDirtiedBad++
+                       // Don't increment stats above connection-level for every involved
+                       // connection.
+                       t.allStats((*ConnStats).incrementPiecesDirtiedBad)
                        for _, c := range touchers {
                                // Y u do dis peer?!
-                               c.stats.PiecesDirtiedBad++
+                               c.stats.incrementPiecesDirtiedBad()
                        }
                        slices.Sort(touchers, connLessTrusted)
                        if t.cl.config.Debug {
@@ -1746,3 +1760,10 @@ func (t *Torrent) AddClientPeer(cl *Client) {
                return
        }())
 }
+
+// All stats that include this Torrent. Useful when we want to increment
+// ConnStats but not for every connection.
+func (t *Torrent) allStats(f func(*ConnStats)) {
+       f(&t.stats.ConnStats)
+       f(&t.cl.stats)
+}