-func (c *Peer) doChunkReadStats(size int64) {
- c.allStats(func(cs *ConnStats) { cs.receivedChunk(size) })
-}
-
-// Handle a received chunk from a peer.
-func (c *Peer) receiveChunk(msg *pp.Message) error {
- chunksReceived.Add("total", 1)
-
- ppReq := newRequestFromMessage(msg)
- req := c.t.requestIndexFromRequest(ppReq)
-
- if c.peerChoking {
- chunksReceived.Add("while choked", 1)
- }
-
- if c.validReceiveChunks[req] <= 0 {
- chunksReceived.Add("unexpected", 1)
- return errors.New("received unexpected chunk")
- }
- c.decExpectedChunkReceive(req)
-
- if c.peerChoking && c.peerAllowedFast.Contains(bitmap.BitIndex(ppReq.Index)) {
- chunksReceived.Add("due to allowed fast", 1)
- }
-
- // The request needs to be deleted immediately to prevent cancels occurring asynchronously when
- // have actually already received the piece, while we have the Client unlocked to write the data
- // out.
- intended := false
- {
- if c.requestState.Requests.Contains(req) {
- for _, f := range c.callbacks.ReceivedRequested {
- f(PeerMessageEvent{c, msg})
- }
- }
- // Request has been satisfied.
- if c.deleteRequest(req) || c.requestState.Cancelled.CheckedRemove(req) {
- intended = true
- if !c.peerChoking {
- c._chunksReceivedWhileExpecting++
- }
- if c.isLowOnRequests() {
- c.updateRequests("Peer.receiveChunk deleted request")
- }
- } else {
- chunksReceived.Add("unintended", 1)
- }
- }
-
- t := c.t
- cl := t.cl
-
- // Do we actually want this chunk?
- if t.haveChunk(ppReq) {
- // panic(fmt.Sprintf("%+v", ppReq))
- chunksReceived.Add("redundant", 1)
- c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadWasted }))
- return nil
- }
-
- piece := &t.pieces[ppReq.Index]
-
- c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful }))
- c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData }))
- if intended {
- c.piecesReceivedSinceLastRequestUpdate++
- c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulIntendedData }))
- }
- for _, f := range c.t.cl.config.Callbacks.ReceivedUsefulData {
- f(ReceivedUsefulDataEvent{c, msg})
- }
- c.lastUsefulChunkReceived = time.Now()
-
- // Need to record that it hasn't been written yet, before we attempt to do
- // anything with it.
- piece.incrementPendingWrites()
- // Record that we have the chunk, so we aren't trying to download it while
- // waiting for it to be written to storage.
- piece.unpendChunkIndex(chunkIndexFromChunkSpec(ppReq.ChunkSpec, t.chunkSize))
-
- // Cancel pending requests for this chunk from *other* peers.
- if p := t.pendingRequests[req]; p != nil {
- if p == c {
- panic("should not be pending request from conn that just received it")
- }
- p.cancel(req)
- }
-
- err := func() error {
- cl.unlock()
- defer cl.lock()
- concurrentChunkWrites.Add(1)
- defer concurrentChunkWrites.Add(-1)
- // Write the chunk out. Note that the upper bound on chunk writing concurrency will be the
- // number of connections. We write inline with receiving the chunk (with this lock dance),
- // because we want to handle errors synchronously and I haven't thought of a nice way to
- // defer any concurrency to the storage and have that notify the client of errors. TODO: Do
- // that instead.
- return t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
- }()
-
- piece.decrementPendingWrites()
-
- if err != nil {
- c.logger.WithDefaultLevel(log.Error).Printf("writing received chunk %v: %v", req, err)
- t.pendRequest(req)
- // Necessary to pass TestReceiveChunkStorageFailureSeederFastExtensionDisabled. I think a
- // request update runs while we're writing the chunk that just failed. Then we never do a
- // fresh update after pending the failed request.
- c.updateRequests("Peer.receiveChunk error writing chunk")
- t.onWriteChunkErr(err)
- return nil
- }
-
- c.onDirtiedPiece(pieceIndex(ppReq.Index))
-
- // We need to ensure the piece is only queued once, so only the last chunk writer gets this job.
- if t.pieceAllDirty(pieceIndex(ppReq.Index)) && piece.pendingWrites == 0 {
- t.queuePieceCheck(pieceIndex(ppReq.Index))
- // We don't pend all chunks here anymore because we don't want code dependent on the dirty
- // chunk status (such as the haveChunk call above) to have to check all the various other
- // piece states like queued for hash, hashing etc. This does mean that we need to be sure
- // that chunk pieces are pended at an appropriate time later however.
- }
-
- cl.event.Broadcast()
- // We do this because we've written a chunk, and may change PieceState.Partial.
- t.publishPieceChange(pieceIndex(ppReq.Index))
-
- return nil
-}
-
-func (c *Peer) onDirtiedPiece(piece pieceIndex) {
- if c.peerTouchedPieces == nil {
- c.peerTouchedPieces = make(map[pieceIndex]struct{})
- }
- c.peerTouchedPieces[piece] = struct{}{}
- ds := &c.t.pieces[piece].dirtiers
- if *ds == nil {
- *ds = make(map[*Peer]struct{})
- }
- (*ds)[c] = struct{}{}
-}
-