]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Merge a bunch of stuff into ConnStats and refactor connection.upload
authorMatt Joiner <anacrolix@gmail.com>
Fri, 2 Feb 2018 08:04:56 +0000 (19:04 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Fri, 2 Feb 2018 08:04:56 +0000 (19:04 +1100)
client.go
conn_stats.go
connection.go
connection_test.go
torrent.go
torrent_stats.go

index 54a8b6ef7c9e009df2f54824faa5e40b169093ea..efe5f736776a5a5733ed381106f9412108f7ac47 100644 (file)
--- a/client.go
+++ b/client.go
@@ -950,7 +950,7 @@ func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connect
                        return fmt.Errorf("data has bad offset in payload: %d", begin)
                }
                t.saveMetadataPiece(piece, payload[begin:])
-               c.UsefulChunksReceived++
+               c.stats.ChunksReadUseful++
                c.lastUsefulChunkReceived = time.Now()
                return t.maybeCompleteMetadata()
        case pp.RequestMetadataExtensionMsgType:
index 541e2c5fae3ca61adeedece0f8df59829947c343..89650c9aeeab5d56d9c094c10331bcd9e9638f5b 100644 (file)
@@ -7,16 +7,34 @@ import (
        pp "github.com/anacrolix/torrent/peer_protocol"
 )
 
+// Various connection-level metrics. At the Torrent level these are
+// aggregates. Chunks are messages with data payloads. Data is actual torrent
+// content without any overhead. Useful is something we needed locally.
+// Unwanted is something we didn't ask for (but may still be useful). Written
+// is things sent to the peer, and Read is stuff received from them.
 type ConnStats struct {
-       // Torrent "piece" messages, or data chunks.
-       ChunksWritten int64 // Num piece messages sent.
-       ChunksRead    int64
        // Total bytes on the wire. Includes handshakes and encryption.
        BytesWritten int64
        BytesRead    int64
-       // Data bytes, actual torrent data.
-       DataBytesWritten int64
-       DataBytesRead    int64
+
+       // The rest of the stats only occur on connections after handshakes.
+
+       ChunksWritten int64
+
+       ChunksRead         int64
+       ChunksReadUseful   int64
+       ChunksReadUnwanted int64
+
+       DataBytesWritten    int64
+       DataBytesRead       int64
+       UsefulDataBytesRead int64
+
+       // Number of pieces data was written to, that subsequently passed verification.
+       GoodPiecesDirtied int64
+       // Number of pieces data was written to, that subsequently failed
+       // verification. Note that a connection may not have been the sole dirtier
+       // of a piece.
+       BadPiecesDirtied int64
 }
 
 func (cs *ConnStats) wroteMsg(msg *pp.Message) {
index fbf86e56600cb993db8fae4f85a1cf864e5c391b..b04c9c144bf7e4c055ed4026115732df5b1060f8 100644 (file)
@@ -50,12 +50,7 @@ type connection struct {
        uTP             bool
        closed          missinggo.Event
 
-       stats                  ConnStats
-       UnwantedChunksReceived int
-       UsefulChunksReceived   int
-       chunksSent             int
-       goodPiecesDirtied      int
-       badPiecesDirtied       int
+       stats ConnStats
 
        lastMessageReceived     time.Time
        completedHandshake      time.Time
@@ -213,9 +208,9 @@ func (cn *connection) WriteStatus(w io.Writer, t *Torrent) {
                "    %s completed, %d pieces touched, good chunks: %d/%d-%d reqq: %d-%d, flags: %s\n",
                cn.completedString(),
                len(cn.peerTouchedPieces),
-               cn.UsefulChunksReceived,
-               cn.UnwantedChunksReceived+cn.UsefulChunksReceived,
-               cn.chunksSent,
+               cn.stats.ChunksReadUseful,
+               cn.stats.ChunksReadUnwanted+cn.stats.ChunksReadUseful,
+               cn.stats.ChunksWritten,
                cn.numLocalRequests(),
                len(cn.PeerRequests),
                cn.statusFlags(),
@@ -1079,14 +1074,14 @@ func (c *connection) receiveChunk(msg *pp.Message) {
        // Do we actually want this chunk?
        if !t.wantPiece(req) {
                unwantedChunksReceived.Add(1)
-               c.UnwantedChunksReceived++
+               c.stats.ChunksReadUnwanted++
                return
        }
 
        index := int(req.Index)
        piece := &t.pieces[index]
 
-       c.UsefulChunksReceived++
+       c.stats.ChunksReadUseful++
        c.lastUsefulChunkReceived = time.Now()
        // if t.fastestConn != c {
        // log.Printf("setting fastest connection %p", c)
@@ -1142,51 +1137,59 @@ func (c *connection) receiveChunk(msg *pp.Message) {
        t.publishPieceChange(int(req.Index))
 }
 
-// Also handles choking and unchoking of the remote peer.
-func (c *connection) upload(msg func(pp.Message) bool) bool {
-       t := c.t
-       cl := t.cl
-       if cl.config.NoUpload {
-               return true
+func (c *connection) uploadAllowed() bool {
+       if c.t.cl.config.NoUpload {
+               return false
        }
-       if !c.PeerInterested {
+       if c.t.seeding() {
                return true
        }
-       seeding := t.seeding()
-       if !seeding && !c.peerHasWantedPieces() {
-               // There's no reason to upload to this peer.
-               return true
+       if !c.peerHasWantedPieces() {
+               return false
+       }
+       // Don't upload more than 100 KiB more than we download.
+       if c.stats.DataBytesWritten >= c.stats.DataBytesRead+100<<10 {
+               return false
        }
+       return true
+}
+
+func (c *connection) setRetryUploadTimer(delay time.Duration) {
+       if c.uploadTimer == nil {
+               c.uploadTimer = time.AfterFunc(delay, c.writerCond.Broadcast)
+       } else {
+               c.uploadTimer.Reset(delay)
+       }
+}
+
+// Also handles choking and unchoking of the remote peer.
+func (c *connection) upload(msg func(pp.Message) bool) bool {
        // Breaking or completing this loop means we don't want to upload to the
        // peer anymore, and we choke them.
 another:
-       for seeding || c.chunksSent < c.UsefulChunksReceived+6 {
+       for c.uploadAllowed() {
                // We want to upload to the peer.
                if !c.Unchoke(msg) {
                        return false
                }
                for r := range c.PeerRequests {
-                       res := cl.uploadLimit.ReserveN(time.Now(), int(r.Length))
+                       res := c.t.cl.uploadLimit.ReserveN(time.Now(), int(r.Length))
                        if !res.OK() {
                                panic(fmt.Sprintf("upload rate limiter burst size < %d", r.Length))
                        }
                        delay := res.Delay()
                        if delay > 0 {
                                res.Cancel()
-                               if c.uploadTimer == nil {
-                                       c.uploadTimer = time.AfterFunc(delay, c.writerCond.Broadcast)
-                               } else {
-                                       c.uploadTimer.Reset(delay)
-                               }
+                               c.setRetryUploadTimer(delay)
                                // Hard to say what to return here.
                                return true
                        }
                        more, err := c.sendChunk(r, msg)
                        if err != nil {
                                i := int(r.Index)
-                               if t.pieceComplete(i) {
-                                       t.updatePieceCompletion(i)
-                                       if !t.pieceComplete(i) {
+                               if c.t.pieceComplete(i) {
+                                       c.t.updatePieceCompletion(i)
+                                       if !c.t.pieceComplete(i) {
                                                // We had the piece, but not anymore.
                                                break another
                                        }
@@ -1214,8 +1217,8 @@ func (cn *connection) Drop() {
        cn.t.dropConnection(cn)
 }
 
-func (cn *connection) netGoodPiecesDirtied() int {
-       return cn.goodPiecesDirtied - cn.badPiecesDirtied
+func (cn *connection) netGoodPiecesDirtied() int64 {
+       return cn.stats.GoodPiecesDirtied - cn.stats.BadPiecesDirtied
 }
 
 func (c *connection) peerHasWantedPieces() bool {
@@ -1275,7 +1278,6 @@ func (c *connection) sendChunk(r request, msg func(pp.Message) bool) (more bool,
                Begin: r.Begin,
                Piece: b,
        })
-       c.chunksSent++
        uploadChunksPosted.Add(1)
        c.lastChunkSent = time.Now()
        return
index d3c299b0e0a99338367851dedac69cac5c258c83..5eb19d43c6ce05a7348de856cabf31ed2347f852 100644 (file)
@@ -130,7 +130,7 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) {
        }
        w.Close()
        require.NoError(b, <-mrlErr)
-       require.EqualValues(b, b.N, cn.UsefulChunksReceived)
+       require.EqualValues(b, b.N, cn.stats.ChunksReadUseful)
 }
 
 func TestConnectionReceiveBadChunkIndex(t *testing.T) {
index 5bf85913dc3b6bb4c30f2b6793a6325dd58b8faf..d5a45f4cdd0ec5fcf29bb60d1c87588b74c12d4e 100644 (file)
@@ -839,7 +839,7 @@ func (t *Torrent) worstBadConn() *connection {
        heap.Init(&wcs)
        for wcs.Len() != 0 {
                c := heap.Pop(&wcs).(*connection)
-               if c.UnwantedChunksReceived >= 6 && c.UnwantedChunksReceived > c.UsefulChunksReceived {
+               if c.stats.ChunksReadUnwanted >= 6 && c.stats.ChunksReadUnwanted > c.stats.ChunksReadUseful {
                        return c
                }
                if wcs.Len() >= (t.maxEstablishedConns+1)/2 {
@@ -1516,7 +1516,7 @@ func (t *Torrent) pieceHashed(piece int, correct bool) {
        p.everHashed = true
        if correct {
                for _, c := range touchers {
-                       c.goodPiecesDirtied++
+                       c.stats.GoodPiecesDirtied++
                }
                err := p.Storage().MarkComplete()
                if err != nil {
@@ -1526,11 +1526,11 @@ func (t *Torrent) pieceHashed(piece int, correct bool) {
                if len(touchers) != 0 {
                        for _, c := range touchers {
                                // Y u do dis peer?!
-                               c.badPiecesDirtied++
+                               c.stats.BadPiecesDirtied++
                        }
                        slices.Sort(touchers, connLessTrusted)
                        if t.cl.config.Debug {
-                               log.Printf("dropping first corresponding conn from trust: %v", func() (ret []int) {
+                               log.Printf("dropping first corresponding conn from trust: %v", func() (ret []int64) {
                                        for _, c := range touchers {
                                                ret = append(ret, c.netGoodPiecesDirtied())
                                        }
index f155a0d83ea369e2fa6539b9fe157ace55b18039..edc4ac8ee7281310470249727e195afb9298be69 100644 (file)
@@ -1,7 +1,9 @@
 package torrent
 
 type TorrentStats struct {
-       ConnStats // Aggregates stats over all connections past and present.
+       // Aggregates stats over all connections past and present. Some values may
+       // not have much meaning in the aggregate context.
+       ConnStats
 
        // Ordered by expected descending quantities (if all is well).
        TotalPeers       int