From: Matt Joiner Date: Mon, 18 Sep 2017 02:09:08 +0000 (+1000) Subject: Move uploading to the connection writer X-Git-Tag: v1.0.0~388 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=bb53c97d385fff16bf0b3e3980f3d20b7def6a6b;p=btrtrc.git Move uploading to the connection writer --- diff --git a/client.go b/client.go index cca19e1e..781ebb14 100644 --- a/client.go +++ b/client.go @@ -1142,7 +1142,7 @@ func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connect } } -func (cl *Client) sendChunk(t *Torrent, c *connection, r request) error { +func (cl *Client) sendChunk(t *Torrent, c *connection, r request, msg func(pp.Message) bool) (more bool, err error) { // Count the chunk being sent, even if it isn't. b := make([]byte, r.Length) p := t.info.Piece(int(r.Index)) @@ -1151,9 +1151,9 @@ func (cl *Client) sendChunk(t *Torrent, c *connection, r request) error { if err == nil { panic("expected error") } - return err + return } - c.Post(pp.Message{ + more = msg(pp.Message{ Type: pp.Piece, Index: r.Index, Begin: r.Begin, @@ -1162,7 +1162,7 @@ func (cl *Client) sendChunk(t *Torrent, c *connection, r request) error { c.chunksSent++ uploadChunksPosted.Add(1) c.lastChunkSent = time.Now() - return nil + return } func (cl *Client) openNewConns(t *Torrent) { diff --git a/connection.go b/connection.go index dc7b8390..752f79f4 100644 --- a/connection.go +++ b/connection.go @@ -99,6 +99,7 @@ type connection struct { pieceRequestOrder prioritybitmap.PriorityBitmap postedBuffer bytes.Buffer + uploadTimer *time.Timer writerCond sync.Cond } @@ -297,25 +298,25 @@ func (cn *connection) PeerCancel(r request) bool { return true } -func (cn *connection) Choke() { +func (cn *connection) Choke(msg func(pp.Message) bool) bool { if cn.Choked { - return + return true } - cn.Post(pp.Message{ - Type: pp.Choke, - }) cn.PeerRequests = nil cn.Choked = true + return msg(pp.Message{ + Type: pp.Choke, + }) } -func (cn *connection) Unchoke() { +func (cn *connection) Unchoke(msg func(pp.Message) bool) bool { if !cn.Choked { - return + return true } - cn.Post(pp.Message{ + cn.Choked = false + return msg(pp.Message{ Type: pp.Unchoke, }) - cn.Choked = false } func (cn *connection) SetInterested(interested bool, msg func(pp.Message) bool) bool { @@ -378,6 +379,7 @@ func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) { // have more write buffer space. cn.requestsLowWater = len(cn.requests) / 2 } + cn.upload(msg) } // Writes buffers to the socket from the write channel. @@ -757,33 +759,24 @@ func (c *connection) mainReadLoop() error { c.tickleWriter() case pp.Interested: c.PeerInterested = true - c.upload() + c.tickleWriter() case pp.NotInterested: c.PeerInterested = false - c.Choke() + c.PeerRequests = nil case pp.Have: err = c.peerSentHave(int(msg.Index)) case pp.Request: if c.Choked { break } - if !c.PeerInterested { - err = errors.New("peer sent request but isn't interested") - break - } - if !t.havePiece(msg.Index.Int()) { - // This isn't necessarily them screwing up. We can drop pieces - // from our storage, and can't communicate this to peers - // except by reconnecting. - requestsReceivedForMissingPieces.Add(1) - err = errors.New("peer requested piece we don't have") + if len(c.PeerRequests) >= maxRequests { break } if c.PeerRequests == nil { c.PeerRequests = make(map[request]struct{}, maxRequests) } c.PeerRequests[newRequest(msg.Index, msg.Begin, msg.Length)] = struct{}{} - c.upload() + c.tickleWriter() case pp.Cancel: req := newRequest(msg.Index, msg.Begin, msg.Length) if !c.PeerCancel(req) { @@ -971,8 +964,6 @@ func (c *connection) receiveChunk(msg *pp.Message) { c.UsefulChunksReceived++ c.lastUsefulChunkReceived = time.Now() - c.upload() - // Need to record that it hasn't been written yet, before we attempt to do // anything with it. piece.incrementPendingWrites() @@ -1016,40 +1007,45 @@ func (c *connection) receiveChunk(msg *pp.Message) { } // Also handles choking and unchoking of the remote peer. -func (c *connection) upload() { +func (c *connection) upload(msg func(pp.Message) bool) bool { t := c.t cl := t.cl if cl.config.NoUpload { - return + return true } if !c.PeerInterested { - return + return true } seeding := t.seeding() if !seeding && !c.peerHasWantedPieces() { // There's no reason to upload to this peer. - return + return true } // Breaking or completing this loop means we don't want to upload to the // peer anymore, and we choke them. another: for seeding || c.chunksSent < c.UsefulChunksReceived+6 { // We want to upload to the peer. - c.Unchoke() + if !c.Unchoke(msg) { + return false + } for r := range c.PeerRequests { res := cl.uploadLimit.ReserveN(time.Now(), int(r.Length)) + if !res.OK() { + panic(res) + } delay := res.Delay() if delay > 0 { res.Cancel() - go func() { - time.Sleep(delay) - cl.mu.Lock() - defer cl.mu.Unlock() - c.upload() - }() - return + if c.uploadTimer == nil { + c.uploadTimer = time.AfterFunc(delay, c.writerCond.Broadcast) + } else { + c.uploadTimer.Reset(delay) + } + // Hard to say what to return here. + return true } - err := cl.sendChunk(t, c, r) + more, err := cl.sendChunk(t, c, r, msg) if err != nil { i := int(r.Index) if t.pieceComplete(i) { @@ -1068,11 +1064,14 @@ another: break another } delete(c.PeerRequests, r) + if !more { + return false + } goto another } - return + return true } - c.Choke() + return c.Choke(msg) } func (cn *connection) Drop() { diff --git a/torrent.go b/torrent.go index 1d34b62c..641f8fc0 100644 --- a/torrent.go +++ b/torrent.go @@ -1481,9 +1481,6 @@ func (t *Torrent) onPieceCompleted(piece int) { t.cancelRequestsForPiece(piece) for conn := range t.conns { conn.Have(piece) - // Could check here if peer doesn't have piece, but due to caching - // some peers may have said they have a piece but they don't. - conn.upload() } }