]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Move uploading to the connection writer
authorMatt Joiner <anacrolix@gmail.com>
Mon, 18 Sep 2017 02:09:08 +0000 (12:09 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 18 Sep 2017 02:09:08 +0000 (12:09 +1000)
client.go
connection.go
torrent.go

index cca19e1efb8d00a52031e487affbbb33f8e898b4..781ebb14c760036169bf093d1037cfffd90b0b34 100644 (file)
--- a/client.go
+++ b/client.go
@@ -1142,7 +1142,7 @@ func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connect
        }
 }
 
-func (cl *Client) sendChunk(t *Torrent, c *connection, r request) error {
+func (cl *Client) sendChunk(t *Torrent, c *connection, r request, msg func(pp.Message) bool) (more bool, err error) {
        // Count the chunk being sent, even if it isn't.
        b := make([]byte, r.Length)
        p := t.info.Piece(int(r.Index))
@@ -1151,9 +1151,9 @@ func (cl *Client) sendChunk(t *Torrent, c *connection, r request) error {
                if err == nil {
                        panic("expected error")
                }
-               return err
+               return
        }
-       c.Post(pp.Message{
+       more = msg(pp.Message{
                Type:  pp.Piece,
                Index: r.Index,
                Begin: r.Begin,
@@ -1162,7 +1162,7 @@ func (cl *Client) sendChunk(t *Torrent, c *connection, r request) error {
        c.chunksSent++
        uploadChunksPosted.Add(1)
        c.lastChunkSent = time.Now()
-       return nil
+       return
 }
 
 func (cl *Client) openNewConns(t *Torrent) {
index dc7b8390764d44ee745e34a038c00cd25429c5e9..752f79f42246f196a5b5376a8f1d9ee5354f3185 100644 (file)
@@ -99,6 +99,7 @@ type connection struct {
        pieceRequestOrder prioritybitmap.PriorityBitmap
 
        postedBuffer bytes.Buffer
+       uploadTimer  *time.Timer
        writerCond   sync.Cond
 }
 
@@ -297,25 +298,25 @@ func (cn *connection) PeerCancel(r request) bool {
        return true
 }
 
-func (cn *connection) Choke() {
+func (cn *connection) Choke(msg func(pp.Message) bool) bool {
        if cn.Choked {
-               return
+               return true
        }
-       cn.Post(pp.Message{
-               Type: pp.Choke,
-       })
        cn.PeerRequests = nil
        cn.Choked = true
+       return msg(pp.Message{
+               Type: pp.Choke,
+       })
 }
 
-func (cn *connection) Unchoke() {
+func (cn *connection) Unchoke(msg func(pp.Message) bool) bool {
        if !cn.Choked {
-               return
+               return true
        }
-       cn.Post(pp.Message{
+       cn.Choked = false
+       return msg(pp.Message{
                Type: pp.Unchoke,
        })
-       cn.Choked = false
 }
 
 func (cn *connection) SetInterested(interested bool, msg func(pp.Message) bool) bool {
@@ -378,6 +379,7 @@ func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) {
                // have more write buffer space.
                cn.requestsLowWater = len(cn.requests) / 2
        }
+       cn.upload(msg)
 }
 
 // Writes buffers to the socket from the write channel.
@@ -757,33 +759,24 @@ func (c *connection) mainReadLoop() error {
                        c.tickleWriter()
                case pp.Interested:
                        c.PeerInterested = true
-                       c.upload()
+                       c.tickleWriter()
                case pp.NotInterested:
                        c.PeerInterested = false
-                       c.Choke()
+                       c.PeerRequests = nil
                case pp.Have:
                        err = c.peerSentHave(int(msg.Index))
                case pp.Request:
                        if c.Choked {
                                break
                        }
-                       if !c.PeerInterested {
-                               err = errors.New("peer sent request but isn't interested")
-                               break
-                       }
-                       if !t.havePiece(msg.Index.Int()) {
-                               // This isn't necessarily them screwing up. We can drop pieces
-                               // from our storage, and can't communicate this to peers
-                               // except by reconnecting.
-                               requestsReceivedForMissingPieces.Add(1)
-                               err = errors.New("peer requested piece we don't have")
+                       if len(c.PeerRequests) >= maxRequests {
                                break
                        }
                        if c.PeerRequests == nil {
                                c.PeerRequests = make(map[request]struct{}, maxRequests)
                        }
                        c.PeerRequests[newRequest(msg.Index, msg.Begin, msg.Length)] = struct{}{}
-                       c.upload()
+                       c.tickleWriter()
                case pp.Cancel:
                        req := newRequest(msg.Index, msg.Begin, msg.Length)
                        if !c.PeerCancel(req) {
@@ -971,8 +964,6 @@ func (c *connection) receiveChunk(msg *pp.Message) {
        c.UsefulChunksReceived++
        c.lastUsefulChunkReceived = time.Now()
 
-       c.upload()
-
        // Need to record that it hasn't been written yet, before we attempt to do
        // anything with it.
        piece.incrementPendingWrites()
@@ -1016,40 +1007,45 @@ func (c *connection) receiveChunk(msg *pp.Message) {
 }
 
 // Also handles choking and unchoking of the remote peer.
-func (c *connection) upload() {
+func (c *connection) upload(msg func(pp.Message) bool) bool {
        t := c.t
        cl := t.cl
        if cl.config.NoUpload {
-               return
+               return true
        }
        if !c.PeerInterested {
-               return
+               return true
        }
        seeding := t.seeding()
        if !seeding && !c.peerHasWantedPieces() {
                // There's no reason to upload to this peer.
-               return
+               return true
        }
        // Breaking or completing this loop means we don't want to upload to the
        // peer anymore, and we choke them.
 another:
        for seeding || c.chunksSent < c.UsefulChunksReceived+6 {
                // We want to upload to the peer.
-               c.Unchoke()
+               if !c.Unchoke(msg) {
+                       return false
+               }
                for r := range c.PeerRequests {
                        res := cl.uploadLimit.ReserveN(time.Now(), int(r.Length))
+                       if !res.OK() {
+                               panic(res)
+                       }
                        delay := res.Delay()
                        if delay > 0 {
                                res.Cancel()
-                               go func() {
-                                       time.Sleep(delay)
-                                       cl.mu.Lock()
-                                       defer cl.mu.Unlock()
-                                       c.upload()
-                               }()
-                               return
+                               if c.uploadTimer == nil {
+                                       c.uploadTimer = time.AfterFunc(delay, c.writerCond.Broadcast)
+                               } else {
+                                       c.uploadTimer.Reset(delay)
+                               }
+                               // Hard to say what to return here.
+                               return true
                        }
-                       err := cl.sendChunk(t, c, r)
+                       more, err := cl.sendChunk(t, c, r, msg)
                        if err != nil {
                                i := int(r.Index)
                                if t.pieceComplete(i) {
@@ -1068,11 +1064,14 @@ another:
                                break another
                        }
                        delete(c.PeerRequests, r)
+                       if !more {
+                               return false
+                       }
                        goto another
                }
-               return
+               return true
        }
-       c.Choke()
+       return c.Choke(msg)
 }
 
 func (cn *connection) Drop() {
index 1d34b62cee122ca2373fea0d19b8f00d1e280e11..641f8fc078a1c44e5f0b9c4c2460acffa6334744 100644 (file)
@@ -1481,9 +1481,6 @@ func (t *Torrent) onPieceCompleted(piece int) {
        t.cancelRequestsForPiece(piece)
        for conn := range t.conns {
                conn.Have(piece)
-               // Could check here if peer doesn't have piece, but due to caching
-               // some peers may have said they have a piece but they don't.
-               conn.upload()
        }
 }