From: Matt Joiner Date: Tue, 22 Nov 2016 03:16:18 +0000 (+1100) Subject: Move Client.downloadedChunk to connection.go X-Git-Tag: v1.0.0~536 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=0d9348c5d1f1dba2c7a7105e4bf106664f9b306f;p=btrtrc.git Move Client.downloadedChunk to connection.go --- diff --git a/client.go b/client.go index d28f9b9d..7ce5686f 100644 --- a/client.go +++ b/client.go @@ -1332,78 +1332,6 @@ func (cl *Client) WaitAll() bool { return true } -// Handle a received chunk from a peer. -func (cl *Client) downloadedChunk(t *Torrent, c *connection, msg *pp.Message) { - chunksReceived.Add(1) - - req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece))) - - // Request has been satisfied. - if cl.connDeleteRequest(t, c, req) { - defer c.updateRequests() - } else { - unexpectedChunksReceived.Add(1) - } - - index := int(req.Index) - piece := &t.pieces[index] - - // Do we actually want this chunk? - if !t.wantPiece(req) { - unwantedChunksReceived.Add(1) - c.UnwantedChunksReceived++ - return - } - - c.UsefulChunksReceived++ - c.lastUsefulChunkReceived = time.Now() - - cl.upload(t, c) - - // Need to record that it hasn't been written yet, before we attempt to do - // anything with it. - piece.incrementPendingWrites() - // Record that we have the chunk. - piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize)) - - // Cancel pending requests for this chunk. - for _, c := range t.conns { - if cl.connCancel(t, c, req) { - c.updateRequests() - } - } - - cl.mu.Unlock() - // Write the chunk out. Note that the upper bound on chunk writing - // concurrency will be the number of connections. - err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece) - cl.mu.Lock() - - piece.decrementPendingWrites() - - if err != nil { - log.Printf("%s: error writing chunk %v: %s", t, req, err) - t.pendRequest(req) - t.updatePieceCompletion(int(msg.Index)) - return - } - - // It's important that the piece is potentially queued before we check if - // the piece is still wanted, because if it is queued, it won't be wanted. - if t.pieceAllDirty(index) { - cl.queuePieceCheck(t, int(req.Index)) - } - - if c.peerTouchedPieces == nil { - c.peerTouchedPieces = make(map[int]struct{}) - } - c.peerTouchedPieces[index] = struct{}{} - - cl.event.Broadcast() - t.publishPieceChange(int(req.Index)) - return -} - // Return the connections that touched a piece, and clear the entry while // doing it. func (cl *Client) reapPieceTouches(t *Torrent, piece int) (ret []*connection) { diff --git a/connection.go b/connection.go index 7e88f837..be44a290 100644 --- a/connection.go +++ b/connection.go @@ -926,3 +926,75 @@ func (cn *connection) rw() io.ReadWriter { io.Writer }{cn.r, cn.w} } + +// Handle a received chunk from a peer. +func (cl *Client) downloadedChunk(t *Torrent, c *connection, msg *pp.Message) { + chunksReceived.Add(1) + + req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece))) + + // Request has been satisfied. + if cl.connDeleteRequest(t, c, req) { + defer c.updateRequests() + } else { + unexpectedChunksReceived.Add(1) + } + + index := int(req.Index) + piece := &t.pieces[index] + + // Do we actually want this chunk? + if !t.wantPiece(req) { + unwantedChunksReceived.Add(1) + c.UnwantedChunksReceived++ + return + } + + c.UsefulChunksReceived++ + c.lastUsefulChunkReceived = time.Now() + + cl.upload(t, c) + + // Need to record that it hasn't been written yet, before we attempt to do + // anything with it. + piece.incrementPendingWrites() + // Record that we have the chunk. + piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize)) + + // Cancel pending requests for this chunk. + for _, c := range t.conns { + if cl.connCancel(t, c, req) { + c.updateRequests() + } + } + + cl.mu.Unlock() + // Write the chunk out. Note that the upper bound on chunk writing + // concurrency will be the number of connections. + err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece) + cl.mu.Lock() + + piece.decrementPendingWrites() + + if err != nil { + log.Printf("%s: error writing chunk %v: %s", t, req, err) + t.pendRequest(req) + t.updatePieceCompletion(int(msg.Index)) + return + } + + // It's important that the piece is potentially queued before we check if + // the piece is still wanted, because if it is queued, it won't be wanted. + if t.pieceAllDirty(index) { + cl.queuePieceCheck(t, int(req.Index)) + } + + if c.peerTouchedPieces == nil { + c.peerTouchedPieces = make(map[int]struct{}) + } + c.peerTouchedPieces[index] = struct{}{} + + cl.event.Broadcast() + t.publishPieceChange(int(req.Index)) + return +}