]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Add connection read stats
authorMatt Joiner <anacrolix@gmail.com>
Tue, 12 Jul 2016 06:42:04 +0000 (16:42 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Tue, 12 Jul 2016 06:42:04 +0000 (16:42 +1000)
client.go
client_test.go
conn_stats.go
connection.go
torrent.go

index 1491b25d547841e9075d062c921678467f283711..a82665846b21f5b4e8b8cdf7ef31b49274bacf53 100644 (file)
--- a/client.go
+++ b/client.go
@@ -435,7 +435,7 @@ func (cl *Client) incomingConnection(nc net.Conn, utp bool) {
        if tc, ok := nc.(*net.TCPConn); ok {
                tc.SetLinger(0)
        }
-       c := cl.newConnection(nc)
+       c := newConnection(nc, &cl.mu)
        c.Discovery = peerSourceIncoming
        c.uTP = utp
        cl.runReceivedConn(c)
@@ -575,7 +575,7 @@ func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
 // Performs initiator handshakes and returns a connection. Returns nil
 // *connection if no connection for valid reasons.
 func (cl *Client) handshakesConnection(nc net.Conn, t *Torrent, encrypted, utp bool) (c *connection, err error) {
-       c = cl.newConnection(nc)
+       c = newConnection(nc, &cl.mu)
        c.encrypted = encrypted
        c.uTP = utp
        err = nc.SetDeadline(time.Now().Add(handshakesTimeout))
@@ -1153,6 +1153,7 @@ func (cl *Client) connectionLoop(t *Torrent, c *connection) error {
                if err != nil {
                        return err
                }
+               c.readMsg(&msg)
                c.lastMessageReceived = time.Now()
                if msg.Keepalive {
                        receivedKeepalives.Add(1)
index 014f9a0aa53f29df2aea3baf6bebc2463599503d..9a75a47d847cb7c963268a57609c0053241688b0 100644 (file)
@@ -385,10 +385,15 @@ func testClientTransfer(t *testing.T, ps testClientTransferParams) {
        }
        assertReadAllGreeting(t, r)
        // After one read through, we can assume certain torrent statistics.
-       assert.EqualValues(t, 13, seederTorrent.Stats().DataBytesSent)
-       assert.EqualValues(t, 8, seederTorrent.Stats().ChunksSent)
-       // This is not a strict requirement. It is however interesting to follow.
-       assert.EqualValues(t, 261, seederTorrent.Stats().BytesSent)
+       assert.EqualValues(t, 13, seederTorrent.Stats().DataBytesWritten)
+       assert.EqualValues(t, 8, seederTorrent.Stats().ChunksWritten)
+       // These are not a strict requirement. It is however interesting to
+       // follow.
+       t.Logf("%#v", seederTorrent.Stats())
+       assert.EqualValues(t, 13, seederTorrent.Stats().DataBytesWritten)
+       assert.EqualValues(t, 8, seederTorrent.Stats().ChunksWritten)
+       assert.EqualValues(t, 13, leecherGreeting.Stats().DataBytesRead)
+       assert.EqualValues(t, 8, leecherGreeting.Stats().ChunksRead)
        // Read through again for the cases where the torrent data size exceeds
        // the size of the cache.
        assertReadAllGreeting(t, r)
index 5455b49ad40b432c2eb8bb88127428bd90d90fe2..541e2c5fae3ca61adeedece0f8df59829947c343 100644 (file)
@@ -1,23 +1,66 @@
 package torrent
 
 import (
+       "io"
+       "sync"
+
        pp "github.com/anacrolix/torrent/peer_protocol"
 )
 
 type ConnStats struct {
-       ChunksSent    int64 // Num piece messages sent.
-       BytesSent     int64 // Total bytes sent.
-       DataBytesSent int64 // Data-only bytes sent.
+       // Torrent "piece" messages, or data chunks.
+       ChunksWritten int64 // Num piece messages sent.
+       ChunksRead    int64
+       // Total bytes on the wire. Includes handshakes and encryption.
+       BytesWritten int64
+       BytesRead    int64
+       // Data bytes, actual torrent data.
+       DataBytesWritten int64
+       DataBytesRead    int64
 }
 
-func (cs *ConnStats) wroteMsg(msg pp.Message) {
+func (cs *ConnStats) wroteMsg(msg *pp.Message) {
        switch msg.Type {
        case pp.Piece:
-               cs.ChunksSent++
-               cs.DataBytesSent += int64(len(msg.Piece))
+               cs.ChunksWritten++
+               cs.DataBytesWritten += int64(len(msg.Piece))
        }
 }
 
-func (cs *ConnStats) wroteBytes(b []byte) {
-       cs.BytesSent += int64(len(b))
+func (cs *ConnStats) readMsg(msg *pp.Message) {
+       switch msg.Type {
+       case pp.Piece:
+               cs.ChunksRead++
+               cs.DataBytesRead += int64(len(msg.Piece))
+       }
+}
+
+func (cs *ConnStats) wroteBytes(n int64) {
+       cs.BytesWritten += n
+}
+
+func (cs *ConnStats) readBytes(n int64) {
+       cs.BytesRead += n
+}
+
+type connStatsReadWriter struct {
+       rw io.ReadWriter
+       l  sync.Locker
+       c  *connection
+}
+
+func (me connStatsReadWriter) Write(b []byte) (n int, err error) {
+       n, err = me.rw.Write(b)
+       me.l.Lock()
+       me.c.wroteBytes(int64(n))
+       me.l.Unlock()
+       return
+}
+
+func (me connStatsReadWriter) Read(b []byte) (n int, err error) {
+       n, err = me.rw.Read(b)
+       me.l.Lock()
+       me.c.readBytes(int64(n))
+       me.l.Unlock()
+       return
 }
index 80ef299997e396decbfbd729934a5d6ce86534e8..63bb58fce0c8839faac231d76ca122ae6098bdf0 100644 (file)
@@ -100,15 +100,15 @@ func (cn *connection) mu() sync.Locker {
        return &cn.t.cl.mu
 }
 
-func (cl *Client) newConnection(nc net.Conn) (c *connection) {
+func newConnection(nc net.Conn, l sync.Locker) (c *connection) {
        c = &connection{
                conn: nc,
-               rw:   nc,
 
                Choked:          true,
                PeerChoked:      true,
                PeerMaxRequests: 250,
        }
+       c.rw = connStatsReadWriter{nc, l, c}
        return
 }
 
@@ -427,8 +427,7 @@ func (cn *connection) writer(keepAliveTimeout time.Duration) {
                                panic("short write")
                        }
                        cn.mu().Lock()
-                       cn.wroteMsg(msg)
-                       cn.wroteBytes(b)
+                       cn.wroteMsg(&msg)
                }
                cn.outgoingUnbufferedMessagesNotEmpty.Clear()
                cn.mu().Unlock()
@@ -645,14 +644,28 @@ func (c *connection) requestPendingMetadata() {
        }
 }
 
-func (cn *connection) wroteMsg(msg pp.Message) {
+func (cn *connection) wroteMsg(msg *pp.Message) {
        cn.stats.wroteMsg(msg)
        cn.t.stats.wroteMsg(msg)
 }
 
-func (cn *connection) wroteBytes(b []byte) {
-       cn.stats.wroteBytes(b)
-       cn.t.stats.wroteBytes(b)
+func (cn *connection) readMsg(msg *pp.Message) {
+       cn.stats.readMsg(msg)
+       cn.t.stats.readMsg(msg)
+}
+
+func (cn *connection) wroteBytes(n int64) {
+       cn.stats.wroteBytes(n)
+       if cn.t != nil {
+               cn.t.stats.wroteBytes(n)
+       }
+}
+
+func (cn *connection) readBytes(n int64) {
+       cn.stats.readBytes(n)
+       if cn.t != nil {
+               cn.t.stats.readBytes(n)
+       }
 }
 
 // Returns whether the connection is currently useful to us. We're seeding and
index 179d61eae19323b7195d34961a9f31bc534acdf2..cb17f49b3b0649b42b88b5b56ae9516f29c3857b 100644 (file)
@@ -1267,6 +1267,8 @@ func (t *Torrent) addPeers(peers []Peer) {
 }
 
 func (t *Torrent) Stats() TorrentStats {
+       t.cl.mu.Lock()
+       defer t.cl.mu.Unlock()
        return t.stats
 }
 
@@ -1300,6 +1302,13 @@ func (t *Torrent) addConnection(c *connection) bool {
                panic(len(t.conns))
        }
        t.conns = append(t.conns, c)
+       if c.t != nil {
+               panic("connection already associated with a torrent")
+       }
+       // Reconcile bytes transferred before connection was associated with a
+       // torrent.
+       t.stats.wroteBytes(c.stats.BytesWritten)
+       t.stats.readBytes(c.stats.BytesRead)
        c.t = t
        return true
 }