From bc0936c44ac0e1ad04e31d35a24cd2b253f0bb94 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Thu, 4 Feb 2021 18:42:43 +1100 Subject: [PATCH] Use locks on piece per resource pieces to prevent races in reading incomplete data --- storage/piece_resource.go | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/storage/piece_resource.go b/storage/piece_resource.go index c723831f..66d877fb 100644 --- a/storage/piece_resource.go +++ b/storage/piece_resource.go @@ -38,6 +38,7 @@ func NewResourcePiecesOpts(p PieceProvider, opts ResourcePiecesOpts) ClientImpl type piecePerResourceTorrentImpl struct { piecePerResource + locks []sync.RWMutex } func (piecePerResourceTorrentImpl) Close() error { @@ -45,13 +46,17 @@ func (piecePerResourceTorrentImpl) Close() error { } func (s piecePerResource) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) { - return piecePerResourceTorrentImpl{s}, nil + return piecePerResourceTorrentImpl{ + s, + make([]sync.RWMutex, info.NumPieces()), + }, nil } -func (s piecePerResource) Piece(p metainfo.Piece) PieceImpl { +func (s piecePerResourceTorrentImpl) Piece(p metainfo.Piece) PieceImpl { return piecePerResourcePiece{ mp: p, - piecePerResource: s, + piecePerResource: s.piecePerResource, + mu: &s.locks[p.Index()], } } @@ -66,11 +71,16 @@ type ConsecutiveChunkReader interface { type piecePerResourcePiece struct { mp metainfo.Piece piecePerResource + // This protects operations that move complete/incomplete pieces around, which can trigger read + // errors that may cause callers to do more drastic things. + mu *sync.RWMutex } var _ io.WriterTo = piecePerResourcePiece{} func (s piecePerResourcePiece) WriteTo(w io.Writer) (int64, error) { + s.mu.RLock() + defer s.mu.RUnlock() if s.mustIsComplete() { r, err := s.completed().Get() if err != nil { @@ -105,6 +115,8 @@ func (s piecePerResourcePiece) mustIsComplete() bool { } func (s piecePerResourcePiece) Completion() Completion { + s.mu.RLock() + defer s.mu.RUnlock() fi, err := s.completed().Stat() return Completion{ Complete: err == nil && fi.Size() == s.mp.Length(), @@ -117,6 +129,8 @@ type SizedPutter interface { } func (s piecePerResourcePiece) MarkComplete() error { + s.mu.Lock() + defer s.mu.Unlock() incompleteChunks := s.getChunks() r, err := func() (io.ReadCloser, error) { if ccr, ok := s.rp.(ConsecutiveChunkReader); ok { @@ -155,10 +169,14 @@ func (s piecePerResourcePiece) MarkComplete() error { } func (s piecePerResourcePiece) MarkNotComplete() error { + s.mu.Lock() + defer s.mu.Unlock() return s.completed().Delete() } func (s piecePerResourcePiece) ReadAt(b []byte, off int64) (int, error) { + s.mu.RLock() + defer s.mu.RUnlock() if s.mustIsComplete() { return s.completed().ReadAt(b, off) } @@ -166,6 +184,8 @@ func (s piecePerResourcePiece) ReadAt(b []byte, off int64) (int, error) { } func (s piecePerResourcePiece) WriteAt(b []byte, off int64) (n int, err error) { + s.mu.RLock() + defer s.mu.RUnlock() i, err := s.rp.NewInstance(path.Join(s.incompleteDirPath(), strconv.FormatInt(off, 10))) if err != nil { panic(err) -- 2.48.1