From bb53c97d385fff16bf0b3e3980f3d20b7def6a6b Mon Sep 17 00:00:00 2001
From: Matt Joiner <anacrolix@gmail.com>
Date: Mon, 18 Sep 2017 12:09:08 +1000
Subject: [PATCH] Move uploading to the connection writer

---
 client.go     |  8 +++---
 connection.go | 77 +++++++++++++++++++++++++--------------------------
 torrent.go    |  3 --
 3 files changed, 42 insertions(+), 46 deletions(-)

diff --git a/client.go b/client.go
index cca19e1e..781ebb14 100644
--- a/client.go
+++ b/client.go
@@ -1142,7 +1142,7 @@ func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connect
 	}
 }
 
-func (cl *Client) sendChunk(t *Torrent, c *connection, r request) error {
+func (cl *Client) sendChunk(t *Torrent, c *connection, r request, msg func(pp.Message) bool) (more bool, err error) {
 	// Count the chunk being sent, even if it isn't.
 	b := make([]byte, r.Length)
 	p := t.info.Piece(int(r.Index))
@@ -1151,9 +1151,9 @@ func (cl *Client) sendChunk(t *Torrent, c *connection, r request) error {
 		if err == nil {
 			panic("expected error")
 		}
-		return err
+		return
 	}
-	c.Post(pp.Message{
+	more = msg(pp.Message{
 		Type:  pp.Piece,
 		Index: r.Index,
 		Begin: r.Begin,
@@ -1162,7 +1162,7 @@ func (cl *Client) sendChunk(t *Torrent, c *connection, r request) error {
 	c.chunksSent++
 	uploadChunksPosted.Add(1)
 	c.lastChunkSent = time.Now()
-	return nil
+	return
 }
 
 func (cl *Client) openNewConns(t *Torrent) {
diff --git a/connection.go b/connection.go
index dc7b8390..752f79f4 100644
--- a/connection.go
+++ b/connection.go
@@ -99,6 +99,7 @@ type connection struct {
 	pieceRequestOrder prioritybitmap.PriorityBitmap
 
 	postedBuffer bytes.Buffer
+	uploadTimer  *time.Timer
 	writerCond   sync.Cond
 }
 
@@ -297,25 +298,25 @@ func (cn *connection) PeerCancel(r request) bool {
 	return true
 }
 
-func (cn *connection) Choke() {
+func (cn *connection) Choke(msg func(pp.Message) bool) bool {
 	if cn.Choked {
-		return
+		return true
 	}
-	cn.Post(pp.Message{
-		Type: pp.Choke,
-	})
 	cn.PeerRequests = nil
 	cn.Choked = true
+	return msg(pp.Message{
+		Type: pp.Choke,
+	})
 }
 
-func (cn *connection) Unchoke() {
+func (cn *connection) Unchoke(msg func(pp.Message) bool) bool {
 	if !cn.Choked {
-		return
+		return true
 	}
-	cn.Post(pp.Message{
+	cn.Choked = false
+	return msg(pp.Message{
 		Type: pp.Unchoke,
 	})
-	cn.Choked = false
 }
 
 func (cn *connection) SetInterested(interested bool, msg func(pp.Message) bool) bool {
@@ -378,6 +379,7 @@ func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) {
 		// have more write buffer space.
 		cn.requestsLowWater = len(cn.requests) / 2
 	}
+	cn.upload(msg)
 }
 
 // Writes buffers to the socket from the write channel.
@@ -757,33 +759,24 @@ func (c *connection) mainReadLoop() error {
 			c.tickleWriter()
 		case pp.Interested:
 			c.PeerInterested = true
-			c.upload()
+			c.tickleWriter()
 		case pp.NotInterested:
 			c.PeerInterested = false
-			c.Choke()
+			c.PeerRequests = nil
 		case pp.Have:
 			err = c.peerSentHave(int(msg.Index))
 		case pp.Request:
 			if c.Choked {
 				break
 			}
-			if !c.PeerInterested {
-				err = errors.New("peer sent request but isn't interested")
-				break
-			}
-			if !t.havePiece(msg.Index.Int()) {
-				// This isn't necessarily them screwing up. We can drop pieces
-				// from our storage, and can't communicate this to peers
-				// except by reconnecting.
-				requestsReceivedForMissingPieces.Add(1)
-				err = errors.New("peer requested piece we don't have")
+			if len(c.PeerRequests) >= maxRequests {
 				break
 			}
 			if c.PeerRequests == nil {
 				c.PeerRequests = make(map[request]struct{}, maxRequests)
 			}
 			c.PeerRequests[newRequest(msg.Index, msg.Begin, msg.Length)] = struct{}{}
-			c.upload()
+			c.tickleWriter()
 		case pp.Cancel:
 			req := newRequest(msg.Index, msg.Begin, msg.Length)
 			if !c.PeerCancel(req) {
@@ -971,8 +964,6 @@ func (c *connection) receiveChunk(msg *pp.Message) {
 	c.UsefulChunksReceived++
 	c.lastUsefulChunkReceived = time.Now()
 
-	c.upload()
-
 	// Need to record that it hasn't been written yet, before we attempt to do
 	// anything with it.
 	piece.incrementPendingWrites()
@@ -1016,40 +1007,45 @@ func (c *connection) receiveChunk(msg *pp.Message) {
 }
 
 // Also handles choking and unchoking of the remote peer.
-func (c *connection) upload() {
+func (c *connection) upload(msg func(pp.Message) bool) bool {
 	t := c.t
 	cl := t.cl
 	if cl.config.NoUpload {
-		return
+		return true
 	}
 	if !c.PeerInterested {
-		return
+		return true
 	}
 	seeding := t.seeding()
 	if !seeding && !c.peerHasWantedPieces() {
 		// There's no reason to upload to this peer.
-		return
+		return true
 	}
 	// Breaking or completing this loop means we don't want to upload to the
 	// peer anymore, and we choke them.
 another:
 	for seeding || c.chunksSent < c.UsefulChunksReceived+6 {
 		// We want to upload to the peer.
-		c.Unchoke()
+		if !c.Unchoke(msg) {
+			return false
+		}
 		for r := range c.PeerRequests {
 			res := cl.uploadLimit.ReserveN(time.Now(), int(r.Length))
+			if !res.OK() {
+				panic(res)
+			}
 			delay := res.Delay()
 			if delay > 0 {
 				res.Cancel()
-				go func() {
-					time.Sleep(delay)
-					cl.mu.Lock()
-					defer cl.mu.Unlock()
-					c.upload()
-				}()
-				return
+				if c.uploadTimer == nil {
+					c.uploadTimer = time.AfterFunc(delay, c.writerCond.Broadcast)
+				} else {
+					c.uploadTimer.Reset(delay)
+				}
+				// Hard to say what to return here.
+				return true
 			}
-			err := cl.sendChunk(t, c, r)
+			more, err := cl.sendChunk(t, c, r, msg)
 			if err != nil {
 				i := int(r.Index)
 				if t.pieceComplete(i) {
@@ -1068,11 +1064,14 @@ another:
 				break another
 			}
 			delete(c.PeerRequests, r)
+			if !more {
+				return false
+			}
 			goto another
 		}
-		return
+		return true
 	}
-	c.Choke()
+	return c.Choke(msg)
 }
 
 func (cn *connection) Drop() {
diff --git a/torrent.go b/torrent.go
index 1d34b62c..641f8fc0 100644
--- a/torrent.go
+++ b/torrent.go
@@ -1481,9 +1481,6 @@ func (t *Torrent) onPieceCompleted(piece int) {
 	t.cancelRequestsForPiece(piece)
 	for conn := range t.conns {
 		conn.Have(piece)
-		// Could check here if peer doesn't have piece, but due to caching
-		// some peers may have said they have a piece but they don't.
-		conn.upload()
 	}
 }
 
-- 
2.51.0