From 417d7d1d486ffdf4369a9530bc27dfc3704a0dfb Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Sun, 11 Oct 2020 12:54:03 +1100 Subject: [PATCH] Tweaks to storage error and completion handling --- peerconn.go | 23 +++++++++++++---------- storage/file_piece.go | 15 ++++++++------- storage/wrappers.go | 11 +++++++++-- test/init_test.go | 1 + torrent.go | 19 +++++++++++++++++-- 5 files changed, 48 insertions(+), 21 deletions(-) diff --git a/peerconn.go b/peerconn.go index 06537c9f..585cd886 100644 --- a/peerconn.go +++ b/peerconn.go @@ -1429,20 +1429,23 @@ another: } more, err := c.sendChunk(r, msg) if err != nil { + c.logger.WithDefaultLevel(log.Warning).Printf("sending chunk to peer: %v", err) i := pieceIndex(r.Index) if c.t.pieceComplete(i) { + // There used to be more code here that just duplicated the following break. + // Piece completions are currently cached, so I'm not sure how helpful this + // update is, except to pull any completion changes pushed to the storage + // backend in failed reads that got us here. c.t.updatePieceCompletion(i) - if !c.t.pieceComplete(i) { - // We had the piece, but not anymore. - break another - } } - log.Str("error sending chunk to peer").AddValues(c, r, err).Log(c.t.logger) - // If we failed to send a chunk, choke the peer to ensure they - // flush all their requests. We've probably dropped a piece, - // but there's no way to communicate this to the peer. If they - // ask for it again, we'll kick them to allow us to send them - // an updated bitfield. + // If we failed to send a chunk, choke the peer by breaking out of the loop here to + // ensure they flush all their requests. We've probably dropped a piece from + // storage, but there's no way to communicate this to the peer. If they ask for it + // again, we'll kick them to allow us to send them an updated bitfield on the next + // connect. + if c.choking { + c.logger.WithDefaultLevel(log.Warning).Printf("already choking peer, requests might not be rejected correctly") + } break another } delete(c.peerRequests, r) diff --git a/storage/file_piece.go b/storage/file_piece.go index a0b54db0..07c6b298 100644 --- a/storage/file_piece.go +++ b/storage/file_piece.go @@ -28,13 +28,14 @@ func (fs *filePieceImpl) Completion() Completion { c.Ok = false return c } - // If it's allegedly complete, check that its constituent files have the - // necessary length. - for _, fi := range extentCompleteRequiredLengths(fs.p.Info, fs.p.Offset(), fs.p.Length()) { - s, err := os.Stat(fs.fileInfoName(fi)) - if err != nil || s.Size() < fi.Length { - c.Complete = false - break + if c.Complete { + // If it's allegedly complete, check that its constituent files have the necessary length. + for _, fi := range extentCompleteRequiredLengths(fs.p.Info, fs.p.Offset(), fs.p.Length()) { + s, err := os.Stat(fs.fileInfoName(fi)) + if err != nil || s.Size() < fi.Length { + c.Complete = false + break + } } } if !c.Complete { diff --git a/storage/wrappers.go b/storage/wrappers.go index 97f52c00..e8e136eb 100644 --- a/storage/wrappers.go +++ b/storage/wrappers.go @@ -76,8 +76,15 @@ func (p Piece) ReadAt(b []byte, off int64) (n int, err error) { panic("io.Copy will get stuck") } off += int64(n) - if off < p.mip.Length() && err != nil { - p.MarkNotComplete() + + // Doing this here may be inaccurate. There's legitimate reasons we may fail to read while the + // data is still there, such as too many open files. There should probably be a specific error + // to return if the data has been lost. + if off < p.mip.Length() { + if err == io.EOF { + p.MarkNotComplete() + } } + return } diff --git a/test/init_test.go b/test/init_test.go index 3aa40695..9e63ef83 100644 --- a/test/init_test.go +++ b/test/init_test.go @@ -1,6 +1,7 @@ package test import ( + _ "github.com/anacrolix/envpprof" "github.com/anacrolix/torrent" ) diff --git a/torrent.go b/torrent.go index 1849dbcf..d84e9e5c 100644 --- a/torrent.go +++ b/torrent.go @@ -1,6 +1,7 @@ package torrent import ( + "bytes" "container/heap" "context" "crypto/sha1" @@ -792,8 +793,22 @@ func (t *Torrent) hashPiece(piece pieceIndex) (ret metainfo.Hash, copyErr error) p.waitNoPendingWrites() ip := t.info.Piece(int(piece)) pl := ip.Length() - _, copyErr = io.CopyN( // Return no error iff pl bytes are copied. - hash, io.NewSectionReader(t.pieces[piece].Storage(), 0, pl), pl) + pieceReader := io.NewSectionReader(t.pieces[piece].Storage(), 0, pl) + var hashSource io.Reader + doCopy := func() { + // Return no error iff pl bytes are copied. + _, copyErr = io.CopyN(hash, hashSource, pl) + } + const logPieceContents = false + if logPieceContents { + var examineBuf bytes.Buffer + hashSource = io.TeeReader(pieceReader, &examineBuf) + doCopy() + log.Printf("hashed %q with copy err %v", examineBuf.Bytes(), copyErr) + } else { + hashSource = pieceReader + doCopy() + } missinggo.CopyExact(&ret, hash.Sum(nil)) return } -- 2.48.1