return true
}
-// Handle a received chunk from a peer.
-func (cl *Client) downloadedChunk(t *Torrent, c *connection, msg *pp.Message) {
- chunksReceived.Add(1)
-
- req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
-
- // Request has been satisfied.
- if cl.connDeleteRequest(t, c, req) {
- defer c.updateRequests()
- } else {
- unexpectedChunksReceived.Add(1)
- }
-
- index := int(req.Index)
- piece := &t.pieces[index]
-
- // Do we actually want this chunk?
- if !t.wantPiece(req) {
- unwantedChunksReceived.Add(1)
- c.UnwantedChunksReceived++
- return
- }
-
- c.UsefulChunksReceived++
- c.lastUsefulChunkReceived = time.Now()
-
- cl.upload(t, c)
-
- // Need to record that it hasn't been written yet, before we attempt to do
- // anything with it.
- piece.incrementPendingWrites()
- // Record that we have the chunk.
- piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
-
- // Cancel pending requests for this chunk.
- for _, c := range t.conns {
- if cl.connCancel(t, c, req) {
- c.updateRequests()
- }
- }
-
- cl.mu.Unlock()
- // Write the chunk out. Note that the upper bound on chunk writing
- // concurrency will be the number of connections.
- err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
- cl.mu.Lock()
-
- piece.decrementPendingWrites()
-
- if err != nil {
- log.Printf("%s: error writing chunk %v: %s", t, req, err)
- t.pendRequest(req)
- t.updatePieceCompletion(int(msg.Index))
- return
- }
-
- // It's important that the piece is potentially queued before we check if
- // the piece is still wanted, because if it is queued, it won't be wanted.
- if t.pieceAllDirty(index) {
- cl.queuePieceCheck(t, int(req.Index))
- }
-
- if c.peerTouchedPieces == nil {
- c.peerTouchedPieces = make(map[int]struct{})
- }
- c.peerTouchedPieces[index] = struct{}{}
-
- cl.event.Broadcast()
- t.publishPieceChange(int(req.Index))
- return
-}
-
// Return the connections that touched a piece, and clear the entry while
// doing it.
func (cl *Client) reapPieceTouches(t *Torrent, piece int) (ret []*connection) {
io.Writer
}{cn.r, cn.w}
}
+
+// Handle a received chunk from a peer.
+func (cl *Client) downloadedChunk(t *Torrent, c *connection, msg *pp.Message) {
+ chunksReceived.Add(1)
+
+ req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
+
+ // Request has been satisfied.
+ if cl.connDeleteRequest(t, c, req) {
+ defer c.updateRequests()
+ } else {
+ unexpectedChunksReceived.Add(1)
+ }
+
+ index := int(req.Index)
+ piece := &t.pieces[index]
+
+ // Do we actually want this chunk?
+ if !t.wantPiece(req) {
+ unwantedChunksReceived.Add(1)
+ c.UnwantedChunksReceived++
+ return
+ }
+
+ c.UsefulChunksReceived++
+ c.lastUsefulChunkReceived = time.Now()
+
+ cl.upload(t, c)
+
+ // Need to record that it hasn't been written yet, before we attempt to do
+ // anything with it.
+ piece.incrementPendingWrites()
+ // Record that we have the chunk.
+ piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
+
+ // Cancel pending requests for this chunk.
+ for _, c := range t.conns {
+ if cl.connCancel(t, c, req) {
+ c.updateRequests()
+ }
+ }
+
+ cl.mu.Unlock()
+ // Write the chunk out. Note that the upper bound on chunk writing
+ // concurrency will be the number of connections.
+ err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
+ cl.mu.Lock()
+
+ piece.decrementPendingWrites()
+
+ if err != nil {
+ log.Printf("%s: error writing chunk %v: %s", t, req, err)
+ t.pendRequest(req)
+ t.updatePieceCompletion(int(msg.Index))
+ return
+ }
+
+ // It's important that the piece is potentially queued before we check if
+ // the piece is still wanted, because if it is queued, it won't be wanted.
+ if t.pieceAllDirty(index) {
+ cl.queuePieceCheck(t, int(req.Index))
+ }
+
+ if c.peerTouchedPieces == nil {
+ c.peerTouchedPieces = make(map[int]struct{})
+ }
+ c.peerTouchedPieces[index] = struct{}{}
+
+ cl.event.Broadcast()
+ t.publishPieceChange(int(req.Index))
+ return
+}