if tc, ok := nc.(*net.TCPConn); ok {
tc.SetLinger(0)
}
- c := cl.newConnection(nc)
+ c := newConnection(nc, &cl.mu)
c.Discovery = peerSourceIncoming
c.uTP = utp
cl.runReceivedConn(c)
// Performs initiator handshakes and returns a connection. Returns nil
// *connection if no connection for valid reasons.
func (cl *Client) handshakesConnection(nc net.Conn, t *Torrent, encrypted, utp bool) (c *connection, err error) {
- c = cl.newConnection(nc)
+ c = newConnection(nc, &cl.mu)
c.encrypted = encrypted
c.uTP = utp
err = nc.SetDeadline(time.Now().Add(handshakesTimeout))
if err != nil {
return err
}
+ c.readMsg(&msg)
c.lastMessageReceived = time.Now()
if msg.Keepalive {
receivedKeepalives.Add(1)
}
assertReadAllGreeting(t, r)
// After one read through, we can assume certain torrent statistics.
- assert.EqualValues(t, 13, seederTorrent.Stats().DataBytesSent)
- assert.EqualValues(t, 8, seederTorrent.Stats().ChunksSent)
- // This is not a strict requirement. It is however interesting to follow.
- assert.EqualValues(t, 261, seederTorrent.Stats().BytesSent)
+ assert.EqualValues(t, 13, seederTorrent.Stats().DataBytesWritten)
+ assert.EqualValues(t, 8, seederTorrent.Stats().ChunksWritten)
+ // These are not a strict requirement. It is however interesting to
+ // follow.
+ t.Logf("%#v", seederTorrent.Stats())
+ assert.EqualValues(t, 13, seederTorrent.Stats().DataBytesWritten)
+ assert.EqualValues(t, 8, seederTorrent.Stats().ChunksWritten)
+ assert.EqualValues(t, 13, leecherGreeting.Stats().DataBytesRead)
+ assert.EqualValues(t, 8, leecherGreeting.Stats().ChunksRead)
// Read through again for the cases where the torrent data size exceeds
// the size of the cache.
assertReadAllGreeting(t, r)
package torrent
import (
+ "io"
+ "sync"
+
pp "github.com/anacrolix/torrent/peer_protocol"
)
type ConnStats struct {
- ChunksSent int64 // Num piece messages sent.
- BytesSent int64 // Total bytes sent.
- DataBytesSent int64 // Data-only bytes sent.
+ // Torrent "piece" messages, or data chunks.
+ ChunksWritten int64 // Num piece messages sent.
+ ChunksRead int64
+ // Total bytes on the wire. Includes handshakes and encryption.
+ BytesWritten int64
+ BytesRead int64
+ // Data bytes, actual torrent data.
+ DataBytesWritten int64
+ DataBytesRead int64
}
-func (cs *ConnStats) wroteMsg(msg pp.Message) {
+func (cs *ConnStats) wroteMsg(msg *pp.Message) {
switch msg.Type {
case pp.Piece:
- cs.ChunksSent++
- cs.DataBytesSent += int64(len(msg.Piece))
+ cs.ChunksWritten++
+ cs.DataBytesWritten += int64(len(msg.Piece))
}
}
-func (cs *ConnStats) wroteBytes(b []byte) {
- cs.BytesSent += int64(len(b))
+func (cs *ConnStats) readMsg(msg *pp.Message) {
+ switch msg.Type {
+ case pp.Piece:
+ cs.ChunksRead++
+ cs.DataBytesRead += int64(len(msg.Piece))
+ }
+}
+
+func (cs *ConnStats) wroteBytes(n int64) {
+ cs.BytesWritten += n
+}
+
+func (cs *ConnStats) readBytes(n int64) {
+ cs.BytesRead += n
+}
+
+type connStatsReadWriter struct {
+ rw io.ReadWriter
+ l sync.Locker
+ c *connection
+}
+
+func (me connStatsReadWriter) Write(b []byte) (n int, err error) {
+ n, err = me.rw.Write(b)
+ me.l.Lock()
+ me.c.wroteBytes(int64(n))
+ me.l.Unlock()
+ return
+}
+
+func (me connStatsReadWriter) Read(b []byte) (n int, err error) {
+ n, err = me.rw.Read(b)
+ me.l.Lock()
+ me.c.readBytes(int64(n))
+ me.l.Unlock()
+ return
}
return &cn.t.cl.mu
}
-func (cl *Client) newConnection(nc net.Conn) (c *connection) {
+func newConnection(nc net.Conn, l sync.Locker) (c *connection) {
c = &connection{
conn: nc,
- rw: nc,
Choked: true,
PeerChoked: true,
PeerMaxRequests: 250,
}
+ c.rw = connStatsReadWriter{nc, l, c}
return
}
panic("short write")
}
cn.mu().Lock()
- cn.wroteMsg(msg)
- cn.wroteBytes(b)
+ cn.wroteMsg(&msg)
}
cn.outgoingUnbufferedMessagesNotEmpty.Clear()
cn.mu().Unlock()
}
}
-func (cn *connection) wroteMsg(msg pp.Message) {
+func (cn *connection) wroteMsg(msg *pp.Message) {
cn.stats.wroteMsg(msg)
cn.t.stats.wroteMsg(msg)
}
-func (cn *connection) wroteBytes(b []byte) {
- cn.stats.wroteBytes(b)
- cn.t.stats.wroteBytes(b)
+func (cn *connection) readMsg(msg *pp.Message) {
+ cn.stats.readMsg(msg)
+ cn.t.stats.readMsg(msg)
+}
+
+func (cn *connection) wroteBytes(n int64) {
+ cn.stats.wroteBytes(n)
+ if cn.t != nil {
+ cn.t.stats.wroteBytes(n)
+ }
+}
+
+func (cn *connection) readBytes(n int64) {
+ cn.stats.readBytes(n)
+ if cn.t != nil {
+ cn.t.stats.readBytes(n)
+ }
}
// Returns whether the connection is currently useful to us. We're seeding and
}
func (t *Torrent) Stats() TorrentStats {
+ t.cl.mu.Lock()
+ defer t.cl.mu.Unlock()
return t.stats
}
panic(len(t.conns))
}
t.conns = append(t.conns, c)
+ if c.t != nil {
+ panic("connection already associated with a torrent")
+ }
+ // Reconcile bytes transferred before connection was associated with a
+ // torrent.
+ t.stats.wroteBytes(c.stats.BytesWritten)
+ t.stats.readBytes(c.stats.BytesRead)
c.t = t
return true
}