return fmt.Errorf("data has bad offset in payload: %d", begin)
}
t.saveMetadataPiece(piece, payload[begin:])
- c.allStats(add(1, func(cs *ConnStats) *int64 { return &cs.ChunksReadUseful }))
+ c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful }))
c.lastUsefulChunkReceived = time.Now()
return t.maybeCompleteMetadata()
case pp.RequestMetadataExtensionMsgType:
writeBuffer: new(bytes.Buffer),
}
c.writerCond.L = &cl.mu
- c.setRW(connStatsReadWriter{nc, &cl.mu, c})
+ c.setRW(connStatsReadWriter{nc, c})
c.r = &rateLimitedReader{
l: cl.downloadLimit,
r: c.r,
r.SetReadahead(ps.Readahead)
}
assertReadAllGreeting(t, r)
- assert.True(t, 13 <= seederTorrent.Stats().BytesWrittenData)
- assert.True(t, 8 <= seederTorrent.Stats().ChunksWritten)
- assert.True(t, 13 <= leecherTorrent.Stats().BytesReadData)
- assert.True(t, 8 <= leecherTorrent.Stats().ChunksRead)
+
+ seederStats := seederTorrent.Stats()
+ assert.True(t, 13 <= seederStats.BytesWrittenData.Int64())
+ assert.True(t, 8 <= seederStats.ChunksWritten.Int64())
+
+ leecherStats := leecherTorrent.Stats()
+ assert.True(t, 13 <= leecherStats.BytesReadData.Int64())
+ assert.True(t, 8 <= leecherStats.ChunksRead.Int64())
+
// Try reading through again for the cases where the torrent data size
// exceeds the size of the cache.
assertReadAllGreeting(t, r)
import (
"io"
- "sync"
+ "log"
+ "reflect"
+ "sync/atomic"
pp "github.com/anacrolix/torrent/peer_protocol"
)
// is things sent to the peer, and Read is stuff received from them.
type ConnStats struct {
// Total bytes on the wire. Includes handshakes and encryption.
- BytesWritten int64
- BytesWrittenData int64
+ BytesWritten Count
+ BytesWrittenData Count
- BytesRead int64
- BytesReadData int64
- BytesReadUsefulData int64
+ BytesRead Count
+ BytesReadData Count
+ BytesReadUsefulData Count
- ChunksWritten int64
+ ChunksWritten Count
- ChunksRead int64
- ChunksReadUseful int64
- ChunksReadUnwanted int64
+ ChunksRead Count
+ ChunksReadUseful Count
+ ChunksReadUnwanted Count
// Number of pieces data was written to, that subsequently passed verification.
- PiecesDirtiedGood int64
+ PiecesDirtiedGood Count
// Number of pieces data was written to, that subsequently failed
// verification. Note that a connection may not have been the sole dirtier
// of a piece.
- PiecesDirtiedBad int64
+ PiecesDirtiedBad Count
+}
+
+func (me *ConnStats) Copy() (ret ConnStats) {
+ for i := 0; i < reflect.TypeOf(ConnStats{}).NumField(); i++ {
+ n := reflect.ValueOf(me).Elem().Field(i).Addr().Interface().(*Count).Int64()
+ reflect.ValueOf(&ret).Elem().Field(i).Addr().Interface().(*Count).Add(n)
+ }
+ return
+}
+
+type Count struct {
+ n int64
+}
+
+func (me *Count) Add(n int64) {
+ atomic.AddInt64(&me.n, n)
+}
+
+func (me *Count) Int64() int64 {
+ return atomic.LoadInt64(&me.n)
}
func (cs *ConnStats) wroteMsg(msg *pp.Message) {
// TODO: Track messages and not just chunks.
switch msg.Type {
case pp.Piece:
- cs.ChunksWritten++
- cs.BytesWrittenData += int64(len(msg.Piece))
+ cs.ChunksWritten.Add(1)
+ cs.BytesWrittenData.Add(int64(len(msg.Piece)))
}
}
func (cs *ConnStats) readMsg(msg *pp.Message) {
switch msg.Type {
case pp.Piece:
- cs.ChunksRead++
- cs.BytesReadData += int64(len(msg.Piece))
+ cs.ChunksRead.Add(1)
+ cs.BytesReadData.Add(int64(len(msg.Piece)))
}
}
func (cs *ConnStats) incrementPiecesDirtiedGood() {
- cs.PiecesDirtiedGood++
+ cs.PiecesDirtiedGood.Add(1)
}
func (cs *ConnStats) incrementPiecesDirtiedBad() {
- cs.PiecesDirtiedBad++
+ cs.PiecesDirtiedBad.Add(1)
}
-func add(n int64, f func(*ConnStats) *int64) func(*ConnStats) {
+func add(n int64, f func(*ConnStats) *Count) func(*ConnStats) {
return func(cs *ConnStats) {
p := f(cs)
- *p += n
+ p.Add(n)
}
}
type connStatsReadWriter struct {
rw io.ReadWriter
- l sync.Locker
c *connection
}
func (me connStatsReadWriter) Write(b []byte) (n int, err error) {
n, err = me.rw.Write(b)
- go func() {
- me.l.Lock()
- me.c.wroteBytes(int64(n))
- me.l.Unlock()
- }()
+ me.c.wroteBytes(int64(n))
return
}
func (me connStatsReadWriter) Read(b []byte) (n int, err error) {
n, err = me.rw.Read(b)
- go func() {
- me.l.Lock()
- me.c.readBytes(int64(n))
- me.l.Unlock()
- }()
+ me.c.readBytes(int64(n))
return
}
// }
func (cn *connection) downloadRate() float64 {
- return float64(cn.stats.BytesReadUsefulData) / cn.cumInterest().Seconds()
+ return float64(cn.stats.BytesReadUsefulData.Int64()) / cn.cumInterest().Seconds()
}
func (cn *connection) WriteStatus(w io.Writer, t *Torrent) {
// The actual value to use as the maximum outbound requests.
func (cn *connection) nominalMaxRequests() (ret int) {
- return int(clamp(1, int64(cn.PeerMaxRequests), max(64, cn.stats.ChunksReadUseful-(cn.stats.ChunksRead-cn.stats.ChunksReadUseful))))
+ return int(clamp(1, int64(cn.PeerMaxRequests), max(64, cn.stats.ChunksReadUseful.Int64()-(cn.stats.ChunksRead.Int64()-cn.stats.ChunksReadUseful.Int64()))))
}
func (cn *connection) onPeerSentCancel(r request) {
// connection.
func (cn *connection) postHandshakeStats(f func(*ConnStats)) {
t := cn.t
- f(&t.stats.ConnStats)
+ f(&t.stats)
f(&t.cl.stats)
}
}
func (cn *connection) wroteBytes(n int64) {
- cn.allStats(add(n, func(cs *ConnStats) *int64 { return &cs.BytesWritten }))
+ cn.allStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesWritten }))
}
func (cn *connection) readBytes(n int64) {
- cn.allStats(add(n, func(cs *ConnStats) *int64 { return &cs.BytesRead }))
+ cn.allStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesRead }))
}
// Returns whether the connection could be useful to us. We're seeding and
// Do we actually want this chunk?
if !t.wantPiece(req) {
unwantedChunksReceived.Add(1)
- c.allStats(add(1, func(cs *ConnStats) *int64 { return &cs.ChunksReadUnwanted }))
+ c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUnwanted }))
return
}
index := int(req.Index)
piece := &t.pieces[index]
- c.allStats(add(1, func(cs *ConnStats) *int64 { return &cs.ChunksReadUseful }))
- c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *int64 { return &cs.BytesReadUsefulData }))
+ c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful }))
+ c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData }))
c.lastUsefulChunkReceived = time.Now()
// if t.fastestConn != c {
// log.Printf("setting fastest connection %p", c)
return false
}
// Don't upload more than 100 KiB more than we download.
- if c.stats.BytesWrittenData >= c.stats.BytesReadData+100<<10 {
+ if c.stats.BytesWrittenData.Int64() >= c.stats.BytesReadData.Int64()+100<<10 {
return false
}
return true
}
func (cn *connection) netGoodPiecesDirtied() int64 {
- return cn.stats.PiecesDirtiedGood - cn.stats.PiecesDirtiedBad
+ return cn.stats.PiecesDirtiedGood.Int64() - cn.stats.PiecesDirtiedBad.Int64()
}
func (c *connection) peerHasWantedPieces() bool {
// different pieces.
connPieceInclinationPool sync.Pool
// Torrent-level statistics.
- stats TorrentStats
+ stats ConnStats
// Count of each request across active connections.
pendingRequests map[request]int
heap.Init(&wcs)
for wcs.Len() != 0 {
c := heap.Pop(&wcs).(*connection)
- if c.stats.ChunksReadUnwanted >= 6 && c.stats.ChunksReadUnwanted > c.stats.ChunksReadUseful {
+ if c.stats.ChunksReadUnwanted.Int64() >= 6 && c.stats.ChunksReadUnwanted.Int64() > c.stats.ChunksReadUseful.Int64() {
return c
}
// If the connection is in the worst half of the established
// The following are vaguely described in BEP 3.
Left: t.bytesLeftAnnounce(),
- Uploaded: t.stats.BytesWrittenData,
+ Uploaded: t.stats.BytesWrittenData.Int64(),
// There's no mention of wasted or unwanted download in the BEP.
- Downloaded: t.stats.BytesReadUsefulData,
+ Downloaded: t.stats.BytesReadUsefulData.Int64(),
}
}
return t.statsLocked()
}
-func (t *Torrent) statsLocked() TorrentStats {
- t.stats.ActivePeers = len(t.conns)
- t.stats.HalfOpenPeers = len(t.halfOpen)
- t.stats.PendingPeers = t.peers.Len()
- t.stats.TotalPeers = t.numTotalPeers()
- t.stats.ConnectedSeeders = 0
+func (t *Torrent) statsLocked() (ret TorrentStats) {
+ ret.ActivePeers = len(t.conns)
+ ret.HalfOpenPeers = len(t.halfOpen)
+ ret.PendingPeers = t.peers.Len()
+ ret.TotalPeers = t.numTotalPeers()
+ ret.ConnectedSeeders = 0
for c := range t.conns {
if all, ok := c.peerHasAllPieces(); all && ok {
- t.stats.ConnectedSeeders++
+ ret.ConnectedSeeders++
}
}
- return t.stats
+ ret.ConnStats = t.stats.Copy()
+ return
}
// The total number of peers in the torrent.
panic("bad stats")
}
c.postHandshakeStats(func(cs *ConnStats) {
- cs.BytesRead += c.stats.BytesRead
- cs.BytesWritten += c.stats.BytesWritten
+ cs.BytesRead.Add(c.stats.BytesRead.Int64())
+ cs.BytesWritten.Add(c.stats.BytesWritten.Int64())
})
c.reconciledHandshakeStats = true
}
// All stats that include this Torrent. Useful when we want to increment
// ConnStats but not for every connection.
func (t *Torrent) allStats(f func(*ConnStats)) {
- f(&t.stats.ConnStats)
+ f(&t.stats)
f(&t.cl.stats)
}