]> Sergey Matveev's repositories - btrtrc.git/commitdiff
possum: Use MovePrefix for completed chunks
authorMatt Joiner <anacrolix@gmail.com>
Sat, 9 Mar 2024 06:36:22 +0000 (17:36 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 3 Jun 2024 03:53:11 +0000 (13:53 +1000)
storage/piece-resource.go
storage/possum/possum-provider.go

index 1b0ae236a5f3c633afd7f95020eac046ec02045c..80b7f05f124c2536a5109a0af18999b98cf65038 100644 (file)
@@ -71,6 +71,10 @@ type PieceProvider interface {
        resource.Provider
 }
 
+type MovePrefixer interface {
+       MovePrefix(old, new string) error
+}
+
 type ConsecutiveChunkReader interface {
        ReadConsecutiveChunks(prefix string) (io.ReadCloser, error)
 }
@@ -95,7 +99,12 @@ func (s piecePerResourcePiece) WriteTo(w io.Writer) (int64, error) {
        s.mu.RLock()
        defer s.mu.RUnlock()
        if s.mustIsComplete() {
-               r, err := s.completed().Get()
+               if s.hasMovePrefix() {
+                       if ccr, ok := s.rp.(ConsecutiveChunkReader); ok {
+                               return s.writeConsecutiveChunks(ccr, s.completedDirPath(), w)
+                       }
+               }
+               r, err := s.completedInstance().Get()
                if err != nil {
                        return 0, fmt.Errorf("getting complete instance: %w", err)
                }
@@ -103,13 +112,17 @@ func (s piecePerResourcePiece) WriteTo(w io.Writer) (int64, error) {
                return io.Copy(w, r)
        }
        if ccr, ok := s.rp.(ConsecutiveChunkReader); ok {
-               return s.writeConsecutiveIncompleteChunks(ccr, w)
+               return s.writeConsecutiveChunks(ccr, s.incompleteDirPath(), w)
        }
        return io.Copy(w, io.NewSectionReader(s, 0, s.mp.Length()))
 }
 
-func (s piecePerResourcePiece) writeConsecutiveIncompleteChunks(ccw ConsecutiveChunkReader, w io.Writer) (int64, error) {
-       r, err := ccw.ReadConsecutiveChunks(s.incompleteDirPath() + "/")
+func (s piecePerResourcePiece) writeConsecutiveChunks(
+       ccw ConsecutiveChunkReader,
+       dir string,
+       w io.Writer,
+) (int64, error) {
+       r, err := ccw.ReadConsecutiveChunks(dir + "/")
        if err != nil {
                return 0, err
        }
@@ -133,7 +146,13 @@ func (s piecePerResourcePiece) Completion() (_ Completion) {
        }
        s.mu.RLock()
        defer s.mu.RUnlock()
-       fi, err := s.completed().Stat()
+       fi, err := s.completedInstance().Stat()
+       if s.hasMovePrefix() {
+               return Completion{
+                       Complete: err == nil && fi.Size() != 0,
+                       Ok:       true,
+               }
+       }
        return Completion{
                Complete: err == nil && fi.Size() == s.mp.Length(),
                Ok:       true,
@@ -147,7 +166,14 @@ type SizedPutter interface {
 func (s piecePerResourcePiece) MarkComplete() (err error) {
        s.mu.Lock()
        defer s.mu.Unlock()
-       incompleteChunks := s.getChunks()
+       if mp, ok := s.rp.(MovePrefixer); ok {
+               err = mp.MovePrefix(s.incompleteDirPath()+"/", s.completedDirPath()+"/")
+               if err != nil {
+                       err = fmt.Errorf("moving incomplete to complete: %w", err)
+               }
+               return
+       }
+       incompleteChunks := s.getChunks(s.incompleteDirPath())
        r, err := func() (io.ReadCloser, error) {
                if ccr, ok := s.rp.(ConsecutiveChunkReader); ok {
                        return ccr.ReadConsecutiveChunks(s.incompleteDirPath() + "/")
@@ -158,7 +184,7 @@ func (s piecePerResourcePiece) MarkComplete() (err error) {
                return fmt.Errorf("getting incomplete chunks reader: %w", err)
        }
        defer r.Close()
-       completedInstance := s.completed()
+       completedInstance := s.completedInstance()
        err = func() error {
                if sp, ok := completedInstance.(SizedPutter); ok && !s.opts.NoSizedPuts {
                        return sp.PutSized(r, s.mp.Length())
@@ -196,16 +222,20 @@ func (s piecePerResourcePiece) MarkComplete() (err error) {
 func (s piecePerResourcePiece) MarkNotComplete() error {
        s.mu.Lock()
        defer s.mu.Unlock()
-       return s.completed().Delete()
+       return s.completedInstance().Delete()
 }
 
 func (s piecePerResourcePiece) ReadAt(b []byte, off int64) (int, error) {
        s.mu.RLock()
        defer s.mu.RUnlock()
        if s.mustIsComplete() {
-               return s.completed().ReadAt(b, off)
+               if s.hasMovePrefix() {
+                       chunks := s.getChunks(s.completedDirPath())
+                       return chunks.ReadAt(b, off)
+               }
+               return s.completedInstance().ReadAt(b, off)
        }
-       return s.getChunks().ReadAt(b, off)
+       return s.getChunks(s.incompleteDirPath()).ReadAt(b, off)
 }
 
 func (s piecePerResourcePiece) WriteAt(b []byte, off int64) (n int, err error) {
@@ -253,8 +283,8 @@ func (me chunks) ReadAt(b []byte, off int64) (int, error) {
        return n, err
 }
 
-func (s piecePerResourcePiece) getChunks() (chunks chunks) {
-       names, err := s.incompleteDir().Readdirnames()
+func (s piecePerResourcePiece) getChunks(dir string) (chunks chunks) {
+       names, err := s.dirInstance(dir).Readdirnames()
        if err != nil {
                return
        }
@@ -263,7 +293,7 @@ func (s piecePerResourcePiece) getChunks() (chunks chunks) {
                if err != nil {
                        panic(err)
                }
-               i, err := s.rp.NewInstance(path.Join(s.incompleteDirPath(), n))
+               i, err := s.rp.NewInstance(path.Join(dir, n))
                if err != nil {
                        panic(err)
                }
@@ -275,11 +305,21 @@ func (s piecePerResourcePiece) getChunks() (chunks chunks) {
        return
 }
 
+func (s piecePerResourcePiece) completedDirPath() string {
+       if !s.hasMovePrefix() {
+               panic("not move prefixing")
+       }
+       return path.Join("completed", s.hashHex())
+}
+
 func (s piecePerResourcePiece) completedInstancePath() string {
+       if s.hasMovePrefix() {
+               return s.completedDirPath() + "/0"
+       }
        return path.Join("completed", s.hashHex())
 }
 
-func (s piecePerResourcePiece) completed() resource.Instance {
+func (s piecePerResourcePiece) completedInstance() resource.Instance {
        i, err := s.rp.NewInstance(s.completedInstancePath())
        if err != nil {
                panic(err)
@@ -291,8 +331,8 @@ func (s piecePerResourcePiece) incompleteDirPath() string {
        return path.Join("incompleted", s.hashHex())
 }
 
-func (s piecePerResourcePiece) incompleteDir() resource.DirInstance {
-       i, err := s.rp.NewInstance(s.incompleteDirPath())
+func (s piecePerResourcePiece) dirInstance(path string) resource.DirInstance {
+       i, err := s.rp.NewInstance(path)
        if err != nil {
                panic(err)
        }
@@ -302,3 +342,8 @@ func (s piecePerResourcePiece) incompleteDir() resource.DirInstance {
 func (me piecePerResourcePiece) hashHex() string {
        return hex.EncodeToString(me.pieceHash.Unwrap())
 }
+
+func (me piecePerResourcePiece) hasMovePrefix() bool {
+       _, ok := me.rp.(MovePrefixer)
+       return ok
+}
index 9ad188ed48c655b878d88091f04a7272914659d0..9eff1bb249a55f8db9b73ef4493c4834a1a7892b 100644 (file)
@@ -137,3 +137,7 @@ func (pp Provider) writeConsecutiveValues(
        }
        return nil
 }
+
+func (pp Provider) MovePrefix(from, to string) (err error) {
+       return pp.Handle.MovePrefix([]byte(from), []byte(to))
+}