From fbe0ded844a21edfca12ca45f400a8e65bb1cc60 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Tue, 12 Jul 2016 16:42:04 +1000 Subject: [PATCH] Add connection read stats --- client.go | 5 +++-- client_test.go | 13 +++++++---- conn_stats.go | 59 +++++++++++++++++++++++++++++++++++++++++++------- connection.go | 29 ++++++++++++++++++------- torrent.go | 9 ++++++++ 5 files changed, 93 insertions(+), 22 deletions(-) diff --git a/client.go b/client.go index 1491b25d..a8266584 100644 --- a/client.go +++ b/client.go @@ -435,7 +435,7 @@ func (cl *Client) incomingConnection(nc net.Conn, utp bool) { if tc, ok := nc.(*net.TCPConn); ok { tc.SetLinger(0) } - c := cl.newConnection(nc) + c := newConnection(nc, &cl.mu) c.Discovery = peerSourceIncoming c.uTP = utp cl.runReceivedConn(c) @@ -575,7 +575,7 @@ func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) { // Performs initiator handshakes and returns a connection. Returns nil // *connection if no connection for valid reasons. func (cl *Client) handshakesConnection(nc net.Conn, t *Torrent, encrypted, utp bool) (c *connection, err error) { - c = cl.newConnection(nc) + c = newConnection(nc, &cl.mu) c.encrypted = encrypted c.uTP = utp err = nc.SetDeadline(time.Now().Add(handshakesTimeout)) @@ -1153,6 +1153,7 @@ func (cl *Client) connectionLoop(t *Torrent, c *connection) error { if err != nil { return err } + c.readMsg(&msg) c.lastMessageReceived = time.Now() if msg.Keepalive { receivedKeepalives.Add(1) diff --git a/client_test.go b/client_test.go index 014f9a0a..9a75a47d 100644 --- a/client_test.go +++ b/client_test.go @@ -385,10 +385,15 @@ func testClientTransfer(t *testing.T, ps testClientTransferParams) { } assertReadAllGreeting(t, r) // After one read through, we can assume certain torrent statistics. - assert.EqualValues(t, 13, seederTorrent.Stats().DataBytesSent) - assert.EqualValues(t, 8, seederTorrent.Stats().ChunksSent) - // This is not a strict requirement. It is however interesting to follow. - assert.EqualValues(t, 261, seederTorrent.Stats().BytesSent) + assert.EqualValues(t, 13, seederTorrent.Stats().DataBytesWritten) + assert.EqualValues(t, 8, seederTorrent.Stats().ChunksWritten) + // These are not a strict requirement. It is however interesting to + // follow. + t.Logf("%#v", seederTorrent.Stats()) + assert.EqualValues(t, 13, seederTorrent.Stats().DataBytesWritten) + assert.EqualValues(t, 8, seederTorrent.Stats().ChunksWritten) + assert.EqualValues(t, 13, leecherGreeting.Stats().DataBytesRead) + assert.EqualValues(t, 8, leecherGreeting.Stats().ChunksRead) // Read through again for the cases where the torrent data size exceeds // the size of the cache. assertReadAllGreeting(t, r) diff --git a/conn_stats.go b/conn_stats.go index 5455b49a..541e2c5f 100644 --- a/conn_stats.go +++ b/conn_stats.go @@ -1,23 +1,66 @@ package torrent import ( + "io" + "sync" + pp "github.com/anacrolix/torrent/peer_protocol" ) type ConnStats struct { - ChunksSent int64 // Num piece messages sent. - BytesSent int64 // Total bytes sent. - DataBytesSent int64 // Data-only bytes sent. + // Torrent "piece" messages, or data chunks. + ChunksWritten int64 // Num piece messages sent. + ChunksRead int64 + // Total bytes on the wire. Includes handshakes and encryption. + BytesWritten int64 + BytesRead int64 + // Data bytes, actual torrent data. + DataBytesWritten int64 + DataBytesRead int64 } -func (cs *ConnStats) wroteMsg(msg pp.Message) { +func (cs *ConnStats) wroteMsg(msg *pp.Message) { switch msg.Type { case pp.Piece: - cs.ChunksSent++ - cs.DataBytesSent += int64(len(msg.Piece)) + cs.ChunksWritten++ + cs.DataBytesWritten += int64(len(msg.Piece)) } } -func (cs *ConnStats) wroteBytes(b []byte) { - cs.BytesSent += int64(len(b)) +func (cs *ConnStats) readMsg(msg *pp.Message) { + switch msg.Type { + case pp.Piece: + cs.ChunksRead++ + cs.DataBytesRead += int64(len(msg.Piece)) + } +} + +func (cs *ConnStats) wroteBytes(n int64) { + cs.BytesWritten += n +} + +func (cs *ConnStats) readBytes(n int64) { + cs.BytesRead += n +} + +type connStatsReadWriter struct { + rw io.ReadWriter + l sync.Locker + c *connection +} + +func (me connStatsReadWriter) Write(b []byte) (n int, err error) { + n, err = me.rw.Write(b) + me.l.Lock() + me.c.wroteBytes(int64(n)) + me.l.Unlock() + return +} + +func (me connStatsReadWriter) Read(b []byte) (n int, err error) { + n, err = me.rw.Read(b) + me.l.Lock() + me.c.readBytes(int64(n)) + me.l.Unlock() + return } diff --git a/connection.go b/connection.go index 80ef2999..63bb58fc 100644 --- a/connection.go +++ b/connection.go @@ -100,15 +100,15 @@ func (cn *connection) mu() sync.Locker { return &cn.t.cl.mu } -func (cl *Client) newConnection(nc net.Conn) (c *connection) { +func newConnection(nc net.Conn, l sync.Locker) (c *connection) { c = &connection{ conn: nc, - rw: nc, Choked: true, PeerChoked: true, PeerMaxRequests: 250, } + c.rw = connStatsReadWriter{nc, l, c} return } @@ -427,8 +427,7 @@ func (cn *connection) writer(keepAliveTimeout time.Duration) { panic("short write") } cn.mu().Lock() - cn.wroteMsg(msg) - cn.wroteBytes(b) + cn.wroteMsg(&msg) } cn.outgoingUnbufferedMessagesNotEmpty.Clear() cn.mu().Unlock() @@ -645,14 +644,28 @@ func (c *connection) requestPendingMetadata() { } } -func (cn *connection) wroteMsg(msg pp.Message) { +func (cn *connection) wroteMsg(msg *pp.Message) { cn.stats.wroteMsg(msg) cn.t.stats.wroteMsg(msg) } -func (cn *connection) wroteBytes(b []byte) { - cn.stats.wroteBytes(b) - cn.t.stats.wroteBytes(b) +func (cn *connection) readMsg(msg *pp.Message) { + cn.stats.readMsg(msg) + cn.t.stats.readMsg(msg) +} + +func (cn *connection) wroteBytes(n int64) { + cn.stats.wroteBytes(n) + if cn.t != nil { + cn.t.stats.wroteBytes(n) + } +} + +func (cn *connection) readBytes(n int64) { + cn.stats.readBytes(n) + if cn.t != nil { + cn.t.stats.readBytes(n) + } } // Returns whether the connection is currently useful to us. We're seeding and diff --git a/torrent.go b/torrent.go index 179d61ea..cb17f49b 100644 --- a/torrent.go +++ b/torrent.go @@ -1267,6 +1267,8 @@ func (t *Torrent) addPeers(peers []Peer) { } func (t *Torrent) Stats() TorrentStats { + t.cl.mu.Lock() + defer t.cl.mu.Unlock() return t.stats } @@ -1300,6 +1302,13 @@ func (t *Torrent) addConnection(c *connection) bool { panic(len(t.conns)) } t.conns = append(t.conns, c) + if c.t != nil { + panic("connection already associated with a torrent") + } + // Reconcile bytes transferred before connection was associated with a + // torrent. + t.stats.wroteBytes(c.stats.BytesWritten) + t.stats.readBytes(c.stats.BytesRead) c.t = t return true } -- 2.50.0