From 6061bc54b3c77f47e862abb7ad1cfcc4420ce83f Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Sat, 9 Mar 2024 17:36:22 +1100 Subject: [PATCH] possum: Use MovePrefix for completed chunks --- storage/piece-resource.go | 77 ++++++++++++++++++++++++------- storage/possum/possum-provider.go | 4 ++ 2 files changed, 65 insertions(+), 16 deletions(-) diff --git a/storage/piece-resource.go b/storage/piece-resource.go index 1b0ae236..80b7f05f 100644 --- a/storage/piece-resource.go +++ b/storage/piece-resource.go @@ -71,6 +71,10 @@ type PieceProvider interface { resource.Provider } +type MovePrefixer interface { + MovePrefix(old, new string) error +} + type ConsecutiveChunkReader interface { ReadConsecutiveChunks(prefix string) (io.ReadCloser, error) } @@ -95,7 +99,12 @@ func (s piecePerResourcePiece) WriteTo(w io.Writer) (int64, error) { s.mu.RLock() defer s.mu.RUnlock() if s.mustIsComplete() { - r, err := s.completed().Get() + if s.hasMovePrefix() { + if ccr, ok := s.rp.(ConsecutiveChunkReader); ok { + return s.writeConsecutiveChunks(ccr, s.completedDirPath(), w) + } + } + r, err := s.completedInstance().Get() if err != nil { return 0, fmt.Errorf("getting complete instance: %w", err) } @@ -103,13 +112,17 @@ func (s piecePerResourcePiece) WriteTo(w io.Writer) (int64, error) { return io.Copy(w, r) } if ccr, ok := s.rp.(ConsecutiveChunkReader); ok { - return s.writeConsecutiveIncompleteChunks(ccr, w) + return s.writeConsecutiveChunks(ccr, s.incompleteDirPath(), w) } return io.Copy(w, io.NewSectionReader(s, 0, s.mp.Length())) } -func (s piecePerResourcePiece) writeConsecutiveIncompleteChunks(ccw ConsecutiveChunkReader, w io.Writer) (int64, error) { - r, err := ccw.ReadConsecutiveChunks(s.incompleteDirPath() + "/") +func (s piecePerResourcePiece) writeConsecutiveChunks( + ccw ConsecutiveChunkReader, + dir string, + w io.Writer, +) (int64, error) { + r, err := ccw.ReadConsecutiveChunks(dir + "/") if err != nil { return 0, err } @@ -133,7 +146,13 @@ func (s piecePerResourcePiece) Completion() (_ Completion) { } s.mu.RLock() defer s.mu.RUnlock() - fi, err := s.completed().Stat() + fi, err := s.completedInstance().Stat() + if s.hasMovePrefix() { + return Completion{ + Complete: err == nil && fi.Size() != 0, + Ok: true, + } + } return Completion{ Complete: err == nil && fi.Size() == s.mp.Length(), Ok: true, @@ -147,7 +166,14 @@ type SizedPutter interface { func (s piecePerResourcePiece) MarkComplete() (err error) { s.mu.Lock() defer s.mu.Unlock() - incompleteChunks := s.getChunks() + if mp, ok := s.rp.(MovePrefixer); ok { + err = mp.MovePrefix(s.incompleteDirPath()+"/", s.completedDirPath()+"/") + if err != nil { + err = fmt.Errorf("moving incomplete to complete: %w", err) + } + return + } + incompleteChunks := s.getChunks(s.incompleteDirPath()) r, err := func() (io.ReadCloser, error) { if ccr, ok := s.rp.(ConsecutiveChunkReader); ok { return ccr.ReadConsecutiveChunks(s.incompleteDirPath() + "/") @@ -158,7 +184,7 @@ func (s piecePerResourcePiece) MarkComplete() (err error) { return fmt.Errorf("getting incomplete chunks reader: %w", err) } defer r.Close() - completedInstance := s.completed() + completedInstance := s.completedInstance() err = func() error { if sp, ok := completedInstance.(SizedPutter); ok && !s.opts.NoSizedPuts { return sp.PutSized(r, s.mp.Length()) @@ -196,16 +222,20 @@ func (s piecePerResourcePiece) MarkComplete() (err error) { func (s piecePerResourcePiece) MarkNotComplete() error { s.mu.Lock() defer s.mu.Unlock() - return s.completed().Delete() + return s.completedInstance().Delete() } func (s piecePerResourcePiece) ReadAt(b []byte, off int64) (int, error) { s.mu.RLock() defer s.mu.RUnlock() if s.mustIsComplete() { - return s.completed().ReadAt(b, off) + if s.hasMovePrefix() { + chunks := s.getChunks(s.completedDirPath()) + return chunks.ReadAt(b, off) + } + return s.completedInstance().ReadAt(b, off) } - return s.getChunks().ReadAt(b, off) + return s.getChunks(s.incompleteDirPath()).ReadAt(b, off) } func (s piecePerResourcePiece) WriteAt(b []byte, off int64) (n int, err error) { @@ -253,8 +283,8 @@ func (me chunks) ReadAt(b []byte, off int64) (int, error) { return n, err } -func (s piecePerResourcePiece) getChunks() (chunks chunks) { - names, err := s.incompleteDir().Readdirnames() +func (s piecePerResourcePiece) getChunks(dir string) (chunks chunks) { + names, err := s.dirInstance(dir).Readdirnames() if err != nil { return } @@ -263,7 +293,7 @@ func (s piecePerResourcePiece) getChunks() (chunks chunks) { if err != nil { panic(err) } - i, err := s.rp.NewInstance(path.Join(s.incompleteDirPath(), n)) + i, err := s.rp.NewInstance(path.Join(dir, n)) if err != nil { panic(err) } @@ -275,11 +305,21 @@ func (s piecePerResourcePiece) getChunks() (chunks chunks) { return } +func (s piecePerResourcePiece) completedDirPath() string { + if !s.hasMovePrefix() { + panic("not move prefixing") + } + return path.Join("completed", s.hashHex()) +} + func (s piecePerResourcePiece) completedInstancePath() string { + if s.hasMovePrefix() { + return s.completedDirPath() + "/0" + } return path.Join("completed", s.hashHex()) } -func (s piecePerResourcePiece) completed() resource.Instance { +func (s piecePerResourcePiece) completedInstance() resource.Instance { i, err := s.rp.NewInstance(s.completedInstancePath()) if err != nil { panic(err) @@ -291,8 +331,8 @@ func (s piecePerResourcePiece) incompleteDirPath() string { return path.Join("incompleted", s.hashHex()) } -func (s piecePerResourcePiece) incompleteDir() resource.DirInstance { - i, err := s.rp.NewInstance(s.incompleteDirPath()) +func (s piecePerResourcePiece) dirInstance(path string) resource.DirInstance { + i, err := s.rp.NewInstance(path) if err != nil { panic(err) } @@ -302,3 +342,8 @@ func (s piecePerResourcePiece) incompleteDir() resource.DirInstance { func (me piecePerResourcePiece) hashHex() string { return hex.EncodeToString(me.pieceHash.Unwrap()) } + +func (me piecePerResourcePiece) hasMovePrefix() bool { + _, ok := me.rp.(MovePrefixer) + return ok +} diff --git a/storage/possum/possum-provider.go b/storage/possum/possum-provider.go index 9ad188ed..9eff1bb2 100644 --- a/storage/possum/possum-provider.go +++ b/storage/possum/possum-provider.go @@ -137,3 +137,7 @@ func (pp Provider) writeConsecutiveValues( } return nil } + +func (pp Provider) MovePrefix(from, to string) (err error) { + return pp.Handle.MovePrefix([]byte(from), []byte(to)) +} -- 2.48.1