}
more, err := c.sendChunk(r, msg)
if err != nil {
+ c.logger.WithDefaultLevel(log.Warning).Printf("sending chunk to peer: %v", err)
i := pieceIndex(r.Index)
if c.t.pieceComplete(i) {
+ // There used to be more code here that just duplicated the following break.
+ // Piece completions are currently cached, so I'm not sure how helpful this
+ // update is, except to pull any completion changes pushed to the storage
+ // backend in failed reads that got us here.
c.t.updatePieceCompletion(i)
- if !c.t.pieceComplete(i) {
- // We had the piece, but not anymore.
- break another
- }
}
- log.Str("error sending chunk to peer").AddValues(c, r, err).Log(c.t.logger)
- // If we failed to send a chunk, choke the peer to ensure they
- // flush all their requests. We've probably dropped a piece,
- // but there's no way to communicate this to the peer. If they
- // ask for it again, we'll kick them to allow us to send them
- // an updated bitfield.
+ // If we failed to send a chunk, choke the peer by breaking out of the loop here to
+ // ensure they flush all their requests. We've probably dropped a piece from
+ // storage, but there's no way to communicate this to the peer. If they ask for it
+ // again, we'll kick them to allow us to send them an updated bitfield on the next
+ // connect.
+ if c.choking {
+ c.logger.WithDefaultLevel(log.Warning).Printf("already choking peer, requests might not be rejected correctly")
+ }
break another
}
delete(c.peerRequests, r)
c.Ok = false
return c
}
- // If it's allegedly complete, check that its constituent files have the
- // necessary length.
- for _, fi := range extentCompleteRequiredLengths(fs.p.Info, fs.p.Offset(), fs.p.Length()) {
- s, err := os.Stat(fs.fileInfoName(fi))
- if err != nil || s.Size() < fi.Length {
- c.Complete = false
- break
+ if c.Complete {
+ // If it's allegedly complete, check that its constituent files have the necessary length.
+ for _, fi := range extentCompleteRequiredLengths(fs.p.Info, fs.p.Offset(), fs.p.Length()) {
+ s, err := os.Stat(fs.fileInfoName(fi))
+ if err != nil || s.Size() < fi.Length {
+ c.Complete = false
+ break
+ }
}
}
if !c.Complete {
panic("io.Copy will get stuck")
}
off += int64(n)
- if off < p.mip.Length() && err != nil {
- p.MarkNotComplete()
+
+ // Doing this here may be inaccurate. There's legitimate reasons we may fail to read while the
+ // data is still there, such as too many open files. There should probably be a specific error
+ // to return if the data has been lost.
+ if off < p.mip.Length() {
+ if err == io.EOF {
+ p.MarkNotComplete()
+ }
}
+
return
}
package test
import (
+ _ "github.com/anacrolix/envpprof"
"github.com/anacrolix/torrent"
)
package torrent
import (
+ "bytes"
"container/heap"
"context"
"crypto/sha1"
p.waitNoPendingWrites()
ip := t.info.Piece(int(piece))
pl := ip.Length()
- _, copyErr = io.CopyN( // Return no error iff pl bytes are copied.
- hash, io.NewSectionReader(t.pieces[piece].Storage(), 0, pl), pl)
+ pieceReader := io.NewSectionReader(t.pieces[piece].Storage(), 0, pl)
+ var hashSource io.Reader
+ doCopy := func() {
+ // Return no error iff pl bytes are copied.
+ _, copyErr = io.CopyN(hash, hashSource, pl)
+ }
+ const logPieceContents = false
+ if logPieceContents {
+ var examineBuf bytes.Buffer
+ hashSource = io.TeeReader(pieceReader, &examineBuf)
+ doCopy()
+ log.Printf("hashed %q with copy err %v", examineBuf.Bytes(), copyErr)
+ } else {
+ hashSource = pieceReader
+ doCopy()
+ }
missinggo.CopyExact(&ret, hash.Sum(nil))
return
}