]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Track ConnStats with atomics
authorMatt Joiner <anacrolix@gmail.com>
Tue, 12 Jun 2018 10:21:53 +0000 (20:21 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Tue, 12 Jun 2018 10:21:53 +0000 (20:21 +1000)
client.go
client_test.go
conn_stats.go
connection.go
torrent.go

index 2a65b90689068eeb84f2a128ec97eb0072f0df73..975f0eeec9d667486340c1d39c7b44b0558dd755 100644 (file)
--- a/client.go
+++ b/client.go
@@ -908,7 +908,7 @@ func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connect
                        return fmt.Errorf("data has bad offset in payload: %d", begin)
                }
                t.saveMetadataPiece(piece, payload[begin:])
-               c.allStats(add(1, func(cs *ConnStats) *int64 { return &cs.ChunksReadUseful }))
+               c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful }))
                c.lastUsefulChunkReceived = time.Now()
                return t.maybeCompleteMetadata()
        case pp.RequestMetadataExtensionMsgType:
@@ -1159,7 +1159,7 @@ func (cl *Client) newConnection(nc net.Conn, outgoing bool) (c *connection) {
                writeBuffer:     new(bytes.Buffer),
        }
        c.writerCond.L = &cl.mu
-       c.setRW(connStatsReadWriter{nc, &cl.mu, c})
+       c.setRW(connStatsReadWriter{nc, c})
        c.r = &rateLimitedReader{
                l: cl.downloadLimit,
                r: c.r,
index d74b4e7d95f0488effe702834b22e7092dba6d38..63bcd6637bd0cf9d3802e849353307f45c44e3f4 100644 (file)
@@ -415,10 +415,15 @@ func testClientTransfer(t *testing.T, ps testClientTransferParams) {
                r.SetReadahead(ps.Readahead)
        }
        assertReadAllGreeting(t, r)
-       assert.True(t, 13 <= seederTorrent.Stats().BytesWrittenData)
-       assert.True(t, 8 <= seederTorrent.Stats().ChunksWritten)
-       assert.True(t, 13 <= leecherTorrent.Stats().BytesReadData)
-       assert.True(t, 8 <= leecherTorrent.Stats().ChunksRead)
+
+       seederStats := seederTorrent.Stats()
+       assert.True(t, 13 <= seederStats.BytesWrittenData.Int64())
+       assert.True(t, 8 <= seederStats.ChunksWritten.Int64())
+
+       leecherStats := leecherTorrent.Stats()
+       assert.True(t, 13 <= leecherStats.BytesReadData.Int64())
+       assert.True(t, 8 <= leecherStats.ChunksRead.Int64())
+
        // Try reading through again for the cases where the torrent data size
        // exceeds the size of the cache.
        assertReadAllGreeting(t, r)
index 1f45fa10320ad9d7c941ab596e7c78d9bac32445..79554bc0d46518f5e0c8c4f4de5317e5f19e521d 100644 (file)
@@ -2,7 +2,9 @@ package torrent
 
 import (
        "io"
-       "sync"
+       "log"
+       "reflect"
+       "sync/atomic"
 
        pp "github.com/anacrolix/torrent/peer_protocol"
 )
@@ -14,81 +16,92 @@ import (
 // is things sent to the peer, and Read is stuff received from them.
 type ConnStats struct {
        // Total bytes on the wire. Includes handshakes and encryption.
-       BytesWritten     int64
-       BytesWrittenData int64
+       BytesWritten     Count
+       BytesWrittenData Count
 
-       BytesRead           int64
-       BytesReadData       int64
-       BytesReadUsefulData int64
+       BytesRead           Count
+       BytesReadData       Count
+       BytesReadUsefulData Count
 
-       ChunksWritten int64
+       ChunksWritten Count
 
-       ChunksRead         int64
-       ChunksReadUseful   int64
-       ChunksReadUnwanted int64
+       ChunksRead         Count
+       ChunksReadUseful   Count
+       ChunksReadUnwanted Count
 
        // Number of pieces data was written to, that subsequently passed verification.
-       PiecesDirtiedGood int64
+       PiecesDirtiedGood Count
        // Number of pieces data was written to, that subsequently failed
        // verification. Note that a connection may not have been the sole dirtier
        // of a piece.
-       PiecesDirtiedBad int64
+       PiecesDirtiedBad Count
+}
+
+func (me *ConnStats) Copy() (ret ConnStats) {
+       for i := 0; i < reflect.TypeOf(ConnStats{}).NumField(); i++ {
+               n := reflect.ValueOf(me).Elem().Field(i).Addr().Interface().(*Count).Int64()
+               reflect.ValueOf(&ret).Elem().Field(i).Addr().Interface().(*Count).Add(n)
+       }
+       return
+}
+
+type Count struct {
+       n int64
+}
+
+func (me *Count) Add(n int64) {
+       atomic.AddInt64(&me.n, n)
+}
+
+func (me *Count) Int64() int64 {
+       return atomic.LoadInt64(&me.n)
 }
 
 func (cs *ConnStats) wroteMsg(msg *pp.Message) {
        // TODO: Track messages and not just chunks.
        switch msg.Type {
        case pp.Piece:
-               cs.ChunksWritten++
-               cs.BytesWrittenData += int64(len(msg.Piece))
+               cs.ChunksWritten.Add(1)
+               cs.BytesWrittenData.Add(int64(len(msg.Piece)))
        }
 }
 
 func (cs *ConnStats) readMsg(msg *pp.Message) {
        switch msg.Type {
        case pp.Piece:
-               cs.ChunksRead++
-               cs.BytesReadData += int64(len(msg.Piece))
+               cs.ChunksRead.Add(1)
+               cs.BytesReadData.Add(int64(len(msg.Piece)))
        }
 }
 
 func (cs *ConnStats) incrementPiecesDirtiedGood() {
-       cs.PiecesDirtiedGood++
+       cs.PiecesDirtiedGood.Add(1)
 }
 
 func (cs *ConnStats) incrementPiecesDirtiedBad() {
-       cs.PiecesDirtiedBad++
+       cs.PiecesDirtiedBad.Add(1)
 }
 
-func add(n int64, f func(*ConnStats) *int64) func(*ConnStats) {
+func add(n int64, f func(*ConnStats) *Count) func(*ConnStats) {
        return func(cs *ConnStats) {
                p := f(cs)
-               *p += n
+               p.Add(n)
        }
 }
 
 type connStatsReadWriter struct {
        rw io.ReadWriter
-       l  sync.Locker
        c  *connection
 }
 
 func (me connStatsReadWriter) Write(b []byte) (n int, err error) {
        n, err = me.rw.Write(b)
-       go func() {
-               me.l.Lock()
-               me.c.wroteBytes(int64(n))
-               me.l.Unlock()
-       }()
+       me.c.wroteBytes(int64(n))
        return
 }
 
 func (me connStatsReadWriter) Read(b []byte) (n int, err error) {
        n, err = me.rw.Read(b)
-       go func() {
-               me.l.Lock()
-               me.c.readBytes(int64(n))
-               me.l.Unlock()
-       }()
+       me.c.readBytes(int64(n))
        return
 }
index 6aa007b7369b2bfbaaaeacb7077ab79407f5421b..70aa1ccbd165e2a7deebc7a3b8bd6fe144d56e44 100644 (file)
@@ -223,7 +223,7 @@ func (cn *connection) statusFlags() (ret string) {
 // }
 
 func (cn *connection) downloadRate() float64 {
-       return float64(cn.stats.BytesReadUsefulData) / cn.cumInterest().Seconds()
+       return float64(cn.stats.BytesReadUsefulData.Int64()) / cn.cumInterest().Seconds()
 }
 
 func (cn *connection) WriteStatus(w io.Writer, t *Torrent) {
@@ -324,7 +324,7 @@ func (cn *connection) requestedMetadataPiece(index int) bool {
 
 // The actual value to use as the maximum outbound requests.
 func (cn *connection) nominalMaxRequests() (ret int) {
-       return int(clamp(1, int64(cn.PeerMaxRequests), max(64, cn.stats.ChunksReadUseful-(cn.stats.ChunksRead-cn.stats.ChunksReadUseful))))
+       return int(clamp(1, int64(cn.PeerMaxRequests), max(64, cn.stats.ChunksReadUseful.Int64()-(cn.stats.ChunksRead.Int64()-cn.stats.ChunksReadUseful.Int64()))))
 }
 
 func (cn *connection) onPeerSentCancel(r request) {
@@ -854,7 +854,7 @@ func (cn *connection) readMsg(msg *pp.Message) {
 // connection.
 func (cn *connection) postHandshakeStats(f func(*ConnStats)) {
        t := cn.t
-       f(&t.stats.ConnStats)
+       f(&t.stats)
        f(&t.cl.stats)
 }
 
@@ -869,11 +869,11 @@ func (cn *connection) allStats(f func(*ConnStats)) {
 }
 
 func (cn *connection) wroteBytes(n int64) {
-       cn.allStats(add(n, func(cs *ConnStats) *int64 { return &cs.BytesWritten }))
+       cn.allStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesWritten }))
 }
 
 func (cn *connection) readBytes(n int64) {
-       cn.allStats(add(n, func(cs *ConnStats) *int64 { return &cs.BytesRead }))
+       cn.allStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesRead }))
 }
 
 // Returns whether the connection could be useful to us. We're seeding and
@@ -1199,15 +1199,15 @@ func (c *connection) receiveChunk(msg *pp.Message) {
        // Do we actually want this chunk?
        if !t.wantPiece(req) {
                unwantedChunksReceived.Add(1)
-               c.allStats(add(1, func(cs *ConnStats) *int64 { return &cs.ChunksReadUnwanted }))
+               c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUnwanted }))
                return
        }
 
        index := int(req.Index)
        piece := &t.pieces[index]
 
-       c.allStats(add(1, func(cs *ConnStats) *int64 { return &cs.ChunksReadUseful }))
-       c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *int64 { return &cs.BytesReadUsefulData }))
+       c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful }))
+       c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData }))
        c.lastUsefulChunkReceived = time.Now()
        // if t.fastestConn != c {
        // log.Printf("setting fastest connection %p", c)
@@ -1283,7 +1283,7 @@ func (c *connection) uploadAllowed() bool {
                return false
        }
        // Don't upload more than 100 KiB more than we download.
-       if c.stats.BytesWrittenData >= c.stats.BytesReadData+100<<10 {
+       if c.stats.BytesWrittenData.Int64() >= c.stats.BytesReadData.Int64()+100<<10 {
                return false
        }
        return true
@@ -1353,7 +1353,7 @@ func (cn *connection) Drop() {
 }
 
 func (cn *connection) netGoodPiecesDirtied() int64 {
-       return cn.stats.PiecesDirtiedGood - cn.stats.PiecesDirtiedBad
+       return cn.stats.PiecesDirtiedGood.Int64() - cn.stats.PiecesDirtiedBad.Int64()
 }
 
 func (c *connection) peerHasWantedPieces() bool {
index 9b3ff31e82aea1cdf01770b617045998d39aabfd..fc1fb5dfdf2aa6f3a592c8c90064e0f1f5993d70 100644 (file)
@@ -135,7 +135,7 @@ type Torrent struct {
        // different pieces.
        connPieceInclinationPool sync.Pool
        // Torrent-level statistics.
-       stats TorrentStats
+       stats ConnStats
 
        // Count of each request across active connections.
        pendingRequests map[request]int
@@ -851,7 +851,7 @@ func (t *Torrent) worstBadConn() *connection {
        heap.Init(&wcs)
        for wcs.Len() != 0 {
                c := heap.Pop(&wcs).(*connection)
-               if c.stats.ChunksReadUnwanted >= 6 && c.stats.ChunksReadUnwanted > c.stats.ChunksReadUseful {
+               if c.stats.ChunksReadUnwanted.Int64() >= 6 && c.stats.ChunksReadUnwanted.Int64() > c.stats.ChunksReadUseful.Int64() {
                        return c
                }
                // If the connection is in the worst half of the established
@@ -1343,9 +1343,9 @@ func (t *Torrent) announceRequest() tracker.AnnounceRequest {
                // The following are vaguely described in BEP 3.
 
                Left:     t.bytesLeftAnnounce(),
-               Uploaded: t.stats.BytesWrittenData,
+               Uploaded: t.stats.BytesWrittenData.Int64(),
                // There's no mention of wasted or unwanted download in the BEP.
-               Downloaded: t.stats.BytesReadUsefulData,
+               Downloaded: t.stats.BytesReadUsefulData.Int64(),
        }
 }
 
@@ -1440,18 +1440,19 @@ func (t *Torrent) Stats() TorrentStats {
        return t.statsLocked()
 }
 
-func (t *Torrent) statsLocked() TorrentStats {
-       t.stats.ActivePeers = len(t.conns)
-       t.stats.HalfOpenPeers = len(t.halfOpen)
-       t.stats.PendingPeers = t.peers.Len()
-       t.stats.TotalPeers = t.numTotalPeers()
-       t.stats.ConnectedSeeders = 0
+func (t *Torrent) statsLocked() (ret TorrentStats) {
+       ret.ActivePeers = len(t.conns)
+       ret.HalfOpenPeers = len(t.halfOpen)
+       ret.PendingPeers = t.peers.Len()
+       ret.TotalPeers = t.numTotalPeers()
+       ret.ConnectedSeeders = 0
        for c := range t.conns {
                if all, ok := c.peerHasAllPieces(); all && ok {
-                       t.stats.ConnectedSeeders++
+                       ret.ConnectedSeeders++
                }
        }
-       return t.stats
+       ret.ConnStats = t.stats.Copy()
+       return
 }
 
 // The total number of peers in the torrent.
@@ -1485,8 +1486,8 @@ func (t *Torrent) reconcileHandshakeStats(c *connection) {
                panic("bad stats")
        }
        c.postHandshakeStats(func(cs *ConnStats) {
-               cs.BytesRead += c.stats.BytesRead
-               cs.BytesWritten += c.stats.BytesWritten
+               cs.BytesRead.Add(c.stats.BytesRead.Int64())
+               cs.BytesWritten.Add(c.stats.BytesWritten.Int64())
        })
        c.reconciledHandshakeStats = true
 }
@@ -1764,6 +1765,6 @@ func (t *Torrent) AddClientPeer(cl *Client) {
 // All stats that include this Torrent. Useful when we want to increment
 // ConnStats but not for every connection.
 func (t *Torrent) allStats(f func(*ConnStats)) {
-       f(&t.stats.ConnStats)
+       f(&t.stats)
        f(&t.cl.stats)
 }