]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Tweaks to storage error and completion handling
authorMatt Joiner <anacrolix@gmail.com>
Sun, 11 Oct 2020 01:54:03 +0000 (12:54 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Sun, 11 Oct 2020 01:54:03 +0000 (12:54 +1100)
peerconn.go
storage/file_piece.go
storage/wrappers.go
test/init_test.go
torrent.go

index 06537c9f97a30caabc6a6e158e9e7c217cbdd9f7..585cd886ae441f430c2d339a8a851917e06091a4 100644 (file)
@@ -1429,20 +1429,23 @@ another:
                        }
                        more, err := c.sendChunk(r, msg)
                        if err != nil {
+                               c.logger.WithDefaultLevel(log.Warning).Printf("sending chunk to peer: %v", err)
                                i := pieceIndex(r.Index)
                                if c.t.pieceComplete(i) {
+                                       // There used to be more code here that just duplicated the following break.
+                                       // Piece completions are currently cached, so I'm not sure how helpful this
+                                       // update is, except to pull any completion changes pushed to the storage
+                                       // backend in failed reads that got us here.
                                        c.t.updatePieceCompletion(i)
-                                       if !c.t.pieceComplete(i) {
-                                               // We had the piece, but not anymore.
-                                               break another
-                                       }
                                }
-                               log.Str("error sending chunk to peer").AddValues(c, r, err).Log(c.t.logger)
-                               // If we failed to send a chunk, choke the peer to ensure they
-                               // flush all their requests. We've probably dropped a piece,
-                               // but there's no way to communicate this to the peer. If they
-                               // ask for it again, we'll kick them to allow us to send them
-                               // an updated bitfield.
+                               // If we failed to send a chunk, choke the peer by breaking out of the loop here to
+                               // ensure they flush all their requests. We've probably dropped a piece from
+                               // storage, but there's no way to communicate this to the peer. If they ask for it
+                               // again, we'll kick them to allow us to send them an updated bitfield on the next
+                               // connect.
+                               if c.choking {
+                                       c.logger.WithDefaultLevel(log.Warning).Printf("already choking peer, requests might not be rejected correctly")
+                               }
                                break another
                        }
                        delete(c.peerRequests, r)
index a0b54db0bf2eeeae3ff7cd33745cad2e7860147a..07c6b298df50dabc16d59a1bb97340f76e37c46f 100644 (file)
@@ -28,13 +28,14 @@ func (fs *filePieceImpl) Completion() Completion {
                c.Ok = false
                return c
        }
-       // If it's allegedly complete, check that its constituent files have the
-       // necessary length.
-       for _, fi := range extentCompleteRequiredLengths(fs.p.Info, fs.p.Offset(), fs.p.Length()) {
-               s, err := os.Stat(fs.fileInfoName(fi))
-               if err != nil || s.Size() < fi.Length {
-                       c.Complete = false
-                       break
+       if c.Complete {
+               // If it's allegedly complete, check that its constituent files have the necessary length.
+               for _, fi := range extentCompleteRequiredLengths(fs.p.Info, fs.p.Offset(), fs.p.Length()) {
+                       s, err := os.Stat(fs.fileInfoName(fi))
+                       if err != nil || s.Size() < fi.Length {
+                               c.Complete = false
+                               break
+                       }
                }
        }
        if !c.Complete {
index 97f52c00ddfdf352f1b302860541001b52a9930f..e8e136eba6515572799c6e39d60984bbecfe9730 100644 (file)
@@ -76,8 +76,15 @@ func (p Piece) ReadAt(b []byte, off int64) (n int, err error) {
                panic("io.Copy will get stuck")
        }
        off += int64(n)
-       if off < p.mip.Length() && err != nil {
-               p.MarkNotComplete()
+
+       // Doing this here may be inaccurate. There's legitimate reasons we may fail to read while the
+       // data is still there, such as too many open files. There should probably be a specific error
+       // to return if the data has been lost.
+       if off < p.mip.Length() {
+               if err == io.EOF {
+                       p.MarkNotComplete()
+               }
        }
+
        return
 }
index 3aa4069587ff42e7238a2325969effc3cc06ad9b..9e63ef83cba7ba92f1df9fea88138b9823bc4ff3 100644 (file)
@@ -1,6 +1,7 @@
 package test
 
 import (
+       _ "github.com/anacrolix/envpprof"
        "github.com/anacrolix/torrent"
 )
 
index 1849dbcf3cb456934a9501f6aa6b08e09b36c371..d84e9e5c3e83ac74e759bc79c5112dfd4a0282a7 100644 (file)
@@ -1,6 +1,7 @@
 package torrent
 
 import (
+       "bytes"
        "container/heap"
        "context"
        "crypto/sha1"
@@ -792,8 +793,22 @@ func (t *Torrent) hashPiece(piece pieceIndex) (ret metainfo.Hash, copyErr error)
        p.waitNoPendingWrites()
        ip := t.info.Piece(int(piece))
        pl := ip.Length()
-       _, copyErr = io.CopyN( // Return no error iff pl bytes are copied.
-               hash, io.NewSectionReader(t.pieces[piece].Storage(), 0, pl), pl)
+       pieceReader := io.NewSectionReader(t.pieces[piece].Storage(), 0, pl)
+       var hashSource io.Reader
+       doCopy := func() {
+               // Return no error iff pl bytes are copied.
+               _, copyErr = io.CopyN(hash, hashSource, pl)
+       }
+       const logPieceContents = false
+       if logPieceContents {
+               var examineBuf bytes.Buffer
+               hashSource = io.TeeReader(pieceReader, &examineBuf)
+               doCopy()
+               log.Printf("hashed %q with copy err %v", examineBuf.Bytes(), copyErr)
+       } else {
+               hashSource = pieceReader
+               doCopy()
+       }
        missinggo.CopyExact(&ret, hash.Sum(nil))
        return
 }