resource.Provider
}
+type MovePrefixer interface {
+ MovePrefix(old, new string) error
+}
+
type ConsecutiveChunkReader interface {
ReadConsecutiveChunks(prefix string) (io.ReadCloser, error)
}
s.mu.RLock()
defer s.mu.RUnlock()
if s.mustIsComplete() {
- r, err := s.completed().Get()
+ if s.hasMovePrefix() {
+ if ccr, ok := s.rp.(ConsecutiveChunkReader); ok {
+ return s.writeConsecutiveChunks(ccr, s.completedDirPath(), w)
+ }
+ }
+ r, err := s.completedInstance().Get()
if err != nil {
return 0, fmt.Errorf("getting complete instance: %w", err)
}
return io.Copy(w, r)
}
if ccr, ok := s.rp.(ConsecutiveChunkReader); ok {
- return s.writeConsecutiveIncompleteChunks(ccr, w)
+ return s.writeConsecutiveChunks(ccr, s.incompleteDirPath(), w)
}
return io.Copy(w, io.NewSectionReader(s, 0, s.mp.Length()))
}
-func (s piecePerResourcePiece) writeConsecutiveIncompleteChunks(ccw ConsecutiveChunkReader, w io.Writer) (int64, error) {
- r, err := ccw.ReadConsecutiveChunks(s.incompleteDirPath() + "/")
+func (s piecePerResourcePiece) writeConsecutiveChunks(
+ ccw ConsecutiveChunkReader,
+ dir string,
+ w io.Writer,
+) (int64, error) {
+ r, err := ccw.ReadConsecutiveChunks(dir + "/")
if err != nil {
return 0, err
}
}
s.mu.RLock()
defer s.mu.RUnlock()
- fi, err := s.completed().Stat()
+ fi, err := s.completedInstance().Stat()
+ if s.hasMovePrefix() {
+ return Completion{
+ Complete: err == nil && fi.Size() != 0,
+ Ok: true,
+ }
+ }
return Completion{
Complete: err == nil && fi.Size() == s.mp.Length(),
Ok: true,
func (s piecePerResourcePiece) MarkComplete() (err error) {
s.mu.Lock()
defer s.mu.Unlock()
- incompleteChunks := s.getChunks()
+ if mp, ok := s.rp.(MovePrefixer); ok {
+ err = mp.MovePrefix(s.incompleteDirPath()+"/", s.completedDirPath()+"/")
+ if err != nil {
+ err = fmt.Errorf("moving incomplete to complete: %w", err)
+ }
+ return
+ }
+ incompleteChunks := s.getChunks(s.incompleteDirPath())
r, err := func() (io.ReadCloser, error) {
if ccr, ok := s.rp.(ConsecutiveChunkReader); ok {
return ccr.ReadConsecutiveChunks(s.incompleteDirPath() + "/")
return fmt.Errorf("getting incomplete chunks reader: %w", err)
}
defer r.Close()
- completedInstance := s.completed()
+ completedInstance := s.completedInstance()
err = func() error {
if sp, ok := completedInstance.(SizedPutter); ok && !s.opts.NoSizedPuts {
return sp.PutSized(r, s.mp.Length())
func (s piecePerResourcePiece) MarkNotComplete() error {
s.mu.Lock()
defer s.mu.Unlock()
- return s.completed().Delete()
+ return s.completedInstance().Delete()
}
func (s piecePerResourcePiece) ReadAt(b []byte, off int64) (int, error) {
s.mu.RLock()
defer s.mu.RUnlock()
if s.mustIsComplete() {
- return s.completed().ReadAt(b, off)
+ if s.hasMovePrefix() {
+ chunks := s.getChunks(s.completedDirPath())
+ return chunks.ReadAt(b, off)
+ }
+ return s.completedInstance().ReadAt(b, off)
}
- return s.getChunks().ReadAt(b, off)
+ return s.getChunks(s.incompleteDirPath()).ReadAt(b, off)
}
func (s piecePerResourcePiece) WriteAt(b []byte, off int64) (n int, err error) {
return n, err
}
-func (s piecePerResourcePiece) getChunks() (chunks chunks) {
- names, err := s.incompleteDir().Readdirnames()
+func (s piecePerResourcePiece) getChunks(dir string) (chunks chunks) {
+ names, err := s.dirInstance(dir).Readdirnames()
if err != nil {
return
}
if err != nil {
panic(err)
}
- i, err := s.rp.NewInstance(path.Join(s.incompleteDirPath(), n))
+ i, err := s.rp.NewInstance(path.Join(dir, n))
if err != nil {
panic(err)
}
return
}
+func (s piecePerResourcePiece) completedDirPath() string {
+ if !s.hasMovePrefix() {
+ panic("not move prefixing")
+ }
+ return path.Join("completed", s.hashHex())
+}
+
func (s piecePerResourcePiece) completedInstancePath() string {
+ if s.hasMovePrefix() {
+ return s.completedDirPath() + "/0"
+ }
return path.Join("completed", s.hashHex())
}
-func (s piecePerResourcePiece) completed() resource.Instance {
+func (s piecePerResourcePiece) completedInstance() resource.Instance {
i, err := s.rp.NewInstance(s.completedInstancePath())
if err != nil {
panic(err)
return path.Join("incompleted", s.hashHex())
}
-func (s piecePerResourcePiece) incompleteDir() resource.DirInstance {
- i, err := s.rp.NewInstance(s.incompleteDirPath())
+func (s piecePerResourcePiece) dirInstance(path string) resource.DirInstance {
+ i, err := s.rp.NewInstance(path)
if err != nil {
panic(err)
}
func (me piecePerResourcePiece) hashHex() string {
return hex.EncodeToString(me.pieceHash.Unwrap())
}
+
+func (me piecePerResourcePiece) hasMovePrefix() bool {
+ _, ok := me.rp.(MovePrefixer)
+ return ok
+}