]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Use locks on piece per resource pieces to prevent races in reading incomplete data
authorMatt Joiner <anacrolix@gmail.com>
Thu, 4 Feb 2021 07:42:43 +0000 (18:42 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Thu, 4 Feb 2021 08:18:59 +0000 (19:18 +1100)
storage/piece_resource.go

index c723831fd3e0626e23589f489fa377a26346606b..66d877fbe6c97d4c37dd7a0538e0999eb6ebec83 100644 (file)
@@ -38,6 +38,7 @@ func NewResourcePiecesOpts(p PieceProvider, opts ResourcePiecesOpts) ClientImpl
 
 type piecePerResourceTorrentImpl struct {
        piecePerResource
+       locks []sync.RWMutex
 }
 
 func (piecePerResourceTorrentImpl) Close() error {
@@ -45,13 +46,17 @@ func (piecePerResourceTorrentImpl) Close() error {
 }
 
 func (s piecePerResource) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) {
-       return piecePerResourceTorrentImpl{s}, nil
+       return piecePerResourceTorrentImpl{
+               s,
+               make([]sync.RWMutex, info.NumPieces()),
+       }, nil
 }
 
-func (s piecePerResource) Piece(p metainfo.Piece) PieceImpl {
+func (s piecePerResourceTorrentImpl) Piece(p metainfo.Piece) PieceImpl {
        return piecePerResourcePiece{
                mp:               p,
-               piecePerResource: s,
+               piecePerResource: s.piecePerResource,
+               mu:               &s.locks[p.Index()],
        }
 }
 
@@ -66,11 +71,16 @@ type ConsecutiveChunkReader interface {
 type piecePerResourcePiece struct {
        mp metainfo.Piece
        piecePerResource
+       // This protects operations that move complete/incomplete pieces around, which can trigger read
+       // errors that may cause callers to do more drastic things.
+       mu *sync.RWMutex
 }
 
 var _ io.WriterTo = piecePerResourcePiece{}
 
 func (s piecePerResourcePiece) WriteTo(w io.Writer) (int64, error) {
+       s.mu.RLock()
+       defer s.mu.RUnlock()
        if s.mustIsComplete() {
                r, err := s.completed().Get()
                if err != nil {
@@ -105,6 +115,8 @@ func (s piecePerResourcePiece) mustIsComplete() bool {
 }
 
 func (s piecePerResourcePiece) Completion() Completion {
+       s.mu.RLock()
+       defer s.mu.RUnlock()
        fi, err := s.completed().Stat()
        return Completion{
                Complete: err == nil && fi.Size() == s.mp.Length(),
@@ -117,6 +129,8 @@ type SizedPutter interface {
 }
 
 func (s piecePerResourcePiece) MarkComplete() error {
+       s.mu.Lock()
+       defer s.mu.Unlock()
        incompleteChunks := s.getChunks()
        r, err := func() (io.ReadCloser, error) {
                if ccr, ok := s.rp.(ConsecutiveChunkReader); ok {
@@ -155,10 +169,14 @@ func (s piecePerResourcePiece) MarkComplete() error {
 }
 
 func (s piecePerResourcePiece) MarkNotComplete() error {
+       s.mu.Lock()
+       defer s.mu.Unlock()
        return s.completed().Delete()
 }
 
 func (s piecePerResourcePiece) ReadAt(b []byte, off int64) (int, error) {
+       s.mu.RLock()
+       defer s.mu.RUnlock()
        if s.mustIsComplete() {
                return s.completed().ReadAt(b, off)
        }
@@ -166,6 +184,8 @@ func (s piecePerResourcePiece) ReadAt(b []byte, off int64) (int, error) {
 }
 
 func (s piecePerResourcePiece) WriteAt(b []byte, off int64) (n int, err error) {
+       s.mu.RLock()
+       defer s.mu.RUnlock()
        i, err := s.rp.NewInstance(path.Join(s.incompleteDirPath(), strconv.FormatInt(off, 10)))
        if err != nil {
                panic(err)