]> Sergey Matveev's repositories - btrtrc.git/blobdiff - connection.go
Merge a bunch of stuff into ConnStats and refactor connection.upload
[btrtrc.git] / connection.go
index fbf86e56600cb993db8fae4f85a1cf864e5c391b..b04c9c144bf7e4c055ed4026115732df5b1060f8 100644 (file)
@@ -50,12 +50,7 @@ type connection struct {
        uTP             bool
        closed          missinggo.Event
 
-       stats                  ConnStats
-       UnwantedChunksReceived int
-       UsefulChunksReceived   int
-       chunksSent             int
-       goodPiecesDirtied      int
-       badPiecesDirtied       int
+       stats ConnStats
 
        lastMessageReceived     time.Time
        completedHandshake      time.Time
@@ -213,9 +208,9 @@ func (cn *connection) WriteStatus(w io.Writer, t *Torrent) {
                "    %s completed, %d pieces touched, good chunks: %d/%d-%d reqq: %d-%d, flags: %s\n",
                cn.completedString(),
                len(cn.peerTouchedPieces),
-               cn.UsefulChunksReceived,
-               cn.UnwantedChunksReceived+cn.UsefulChunksReceived,
-               cn.chunksSent,
+               cn.stats.ChunksReadUseful,
+               cn.stats.ChunksReadUnwanted+cn.stats.ChunksReadUseful,
+               cn.stats.ChunksWritten,
                cn.numLocalRequests(),
                len(cn.PeerRequests),
                cn.statusFlags(),
@@ -1079,14 +1074,14 @@ func (c *connection) receiveChunk(msg *pp.Message) {
        // Do we actually want this chunk?
        if !t.wantPiece(req) {
                unwantedChunksReceived.Add(1)
-               c.UnwantedChunksReceived++
+               c.stats.ChunksReadUnwanted++
                return
        }
 
        index := int(req.Index)
        piece := &t.pieces[index]
 
-       c.UsefulChunksReceived++
+       c.stats.ChunksReadUseful++
        c.lastUsefulChunkReceived = time.Now()
        // if t.fastestConn != c {
        // log.Printf("setting fastest connection %p", c)
@@ -1142,51 +1137,59 @@ func (c *connection) receiveChunk(msg *pp.Message) {
        t.publishPieceChange(int(req.Index))
 }
 
-// Also handles choking and unchoking of the remote peer.
-func (c *connection) upload(msg func(pp.Message) bool) bool {
-       t := c.t
-       cl := t.cl
-       if cl.config.NoUpload {
-               return true
+func (c *connection) uploadAllowed() bool {
+       if c.t.cl.config.NoUpload {
+               return false
        }
-       if !c.PeerInterested {
+       if c.t.seeding() {
                return true
        }
-       seeding := t.seeding()
-       if !seeding && !c.peerHasWantedPieces() {
-               // There's no reason to upload to this peer.
-               return true
+       if !c.peerHasWantedPieces() {
+               return false
+       }
+       // Don't upload more than 100 KiB more than we download.
+       if c.stats.DataBytesWritten >= c.stats.DataBytesRead+100<<10 {
+               return false
        }
+       return true
+}
+
+func (c *connection) setRetryUploadTimer(delay time.Duration) {
+       if c.uploadTimer == nil {
+               c.uploadTimer = time.AfterFunc(delay, c.writerCond.Broadcast)
+       } else {
+               c.uploadTimer.Reset(delay)
+       }
+}
+
+// Also handles choking and unchoking of the remote peer.
+func (c *connection) upload(msg func(pp.Message) bool) bool {
        // Breaking or completing this loop means we don't want to upload to the
        // peer anymore, and we choke them.
 another:
-       for seeding || c.chunksSent < c.UsefulChunksReceived+6 {
+       for c.uploadAllowed() {
                // We want to upload to the peer.
                if !c.Unchoke(msg) {
                        return false
                }
                for r := range c.PeerRequests {
-                       res := cl.uploadLimit.ReserveN(time.Now(), int(r.Length))
+                       res := c.t.cl.uploadLimit.ReserveN(time.Now(), int(r.Length))
                        if !res.OK() {
                                panic(fmt.Sprintf("upload rate limiter burst size < %d", r.Length))
                        }
                        delay := res.Delay()
                        if delay > 0 {
                                res.Cancel()
-                               if c.uploadTimer == nil {
-                                       c.uploadTimer = time.AfterFunc(delay, c.writerCond.Broadcast)
-                               } else {
-                                       c.uploadTimer.Reset(delay)
-                               }
+                               c.setRetryUploadTimer(delay)
                                // Hard to say what to return here.
                                return true
                        }
                        more, err := c.sendChunk(r, msg)
                        if err != nil {
                                i := int(r.Index)
-                               if t.pieceComplete(i) {
-                                       t.updatePieceCompletion(i)
-                                       if !t.pieceComplete(i) {
+                               if c.t.pieceComplete(i) {
+                                       c.t.updatePieceCompletion(i)
+                                       if !c.t.pieceComplete(i) {
                                                // We had the piece, but not anymore.
                                                break another
                                        }
@@ -1214,8 +1217,8 @@ func (cn *connection) Drop() {
        cn.t.dropConnection(cn)
 }
 
-func (cn *connection) netGoodPiecesDirtied() int {
-       return cn.goodPiecesDirtied - cn.badPiecesDirtied
+func (cn *connection) netGoodPiecesDirtied() int64 {
+       return cn.stats.GoodPiecesDirtied - cn.stats.BadPiecesDirtied
 }
 
 func (c *connection) peerHasWantedPieces() bool {
@@ -1275,7 +1278,6 @@ func (c *connection) sendChunk(r request, msg func(pp.Message) bool) (more bool,
                Begin: r.Begin,
                Piece: b,
        })
-       c.chunksSent++
        uploadChunksPosted.Add(1)
        c.lastChunkSent = time.Now()
        return