client.go | 72 ----------------------------------------------------- connection.go | 72 +++++++++++++++++++++++++++++++++++++++++++++++++++++ diff --git a/client.go b/client.go index d28f9b9db784e080eaeafb068081182f3d836108..7ce5686f04e32ad2479c9531189906f10e9e2baa 100644 --- a/client.go +++ b/client.go @@ -1332,78 +1332,6 @@ } return true } -// Handle a received chunk from a peer. -func (cl *Client) downloadedChunk(t *Torrent, c *connection, msg *pp.Message) { - chunksReceived.Add(1) - - req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece))) - - // Request has been satisfied. - if cl.connDeleteRequest(t, c, req) { - defer c.updateRequests() - } else { - unexpectedChunksReceived.Add(1) - } - - index := int(req.Index) - piece := &t.pieces[index] - - // Do we actually want this chunk? - if !t.wantPiece(req) { - unwantedChunksReceived.Add(1) - c.UnwantedChunksReceived++ - return - } - - c.UsefulChunksReceived++ - c.lastUsefulChunkReceived = time.Now() - - cl.upload(t, c) - - // Need to record that it hasn't been written yet, before we attempt to do - // anything with it. - piece.incrementPendingWrites() - // Record that we have the chunk. - piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize)) - - // Cancel pending requests for this chunk. - for _, c := range t.conns { - if cl.connCancel(t, c, req) { - c.updateRequests() - } - } - - cl.mu.Unlock() - // Write the chunk out. Note that the upper bound on chunk writing - // concurrency will be the number of connections. - err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece) - cl.mu.Lock() - - piece.decrementPendingWrites() - - if err != nil { - log.Printf("%s: error writing chunk %v: %s", t, req, err) - t.pendRequest(req) - t.updatePieceCompletion(int(msg.Index)) - return - } - - // It's important that the piece is potentially queued before we check if - // the piece is still wanted, because if it is queued, it won't be wanted. - if t.pieceAllDirty(index) { - cl.queuePieceCheck(t, int(req.Index)) - } - - if c.peerTouchedPieces == nil { - c.peerTouchedPieces = make(map[int]struct{}) - } - c.peerTouchedPieces[index] = struct{}{} - - cl.event.Broadcast() - t.publishPieceChange(int(req.Index)) - return -} - // Return the connections that touched a piece, and clear the entry while // doing it. func (cl *Client) reapPieceTouches(t *Torrent, piece int) (ret []*connection) { diff --git a/connection.go b/connection.go index 7e88f837d53ebfa4267a14f2f104323a9992ae90..be44a29048b5e0791088a21c965a14d334e040a1 100644 --- a/connection.go +++ b/connection.go @@ -926,3 +926,75 @@ io.Reader io.Writer }{cn.r, cn.w} } + +// Handle a received chunk from a peer. +func (cl *Client) downloadedChunk(t *Torrent, c *connection, msg *pp.Message) { + chunksReceived.Add(1) + + req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece))) + + // Request has been satisfied. + if cl.connDeleteRequest(t, c, req) { + defer c.updateRequests() + } else { + unexpectedChunksReceived.Add(1) + } + + index := int(req.Index) + piece := &t.pieces[index] + + // Do we actually want this chunk? + if !t.wantPiece(req) { + unwantedChunksReceived.Add(1) + c.UnwantedChunksReceived++ + return + } + + c.UsefulChunksReceived++ + c.lastUsefulChunkReceived = time.Now() + + cl.upload(t, c) + + // Need to record that it hasn't been written yet, before we attempt to do + // anything with it. + piece.incrementPendingWrites() + // Record that we have the chunk. + piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize)) + + // Cancel pending requests for this chunk. + for _, c := range t.conns { + if cl.connCancel(t, c, req) { + c.updateRequests() + } + } + + cl.mu.Unlock() + // Write the chunk out. Note that the upper bound on chunk writing + // concurrency will be the number of connections. + err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece) + cl.mu.Lock() + + piece.decrementPendingWrites() + + if err != nil { + log.Printf("%s: error writing chunk %v: %s", t, req, err) + t.pendRequest(req) + t.updatePieceCompletion(int(msg.Index)) + return + } + + // It's important that the piece is potentially queued before we check if + // the piece is still wanted, because if it is queued, it won't be wanted. + if t.pieceAllDirty(index) { + cl.queuePieceCheck(t, int(req.Index)) + } + + if c.peerTouchedPieces == nil { + c.peerTouchedPieces = make(map[int]struct{}) + } + c.peerTouchedPieces[index] = struct{}{} + + cl.event.Broadcast() + t.publishPieceChange(int(req.Index)) + return +}