From: Matt Joiner Date: Sat, 21 Nov 2020 02:41:45 +0000 (+1100) Subject: Use ConsecutiveChunkWriter in MarkComplete for piece resource storage if available X-Git-Tag: v1.19.0~13 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=b35c204238ace67b6d76a07158470909f8ff01c4;p=btrtrc.git Use ConsecutiveChunkWriter in MarkComplete for piece resource storage if available --- diff --git a/storage/piece_resource.go b/storage/piece_resource.go index 36088f32..02c1e7e8 100644 --- a/storage/piece_resource.go +++ b/storage/piece_resource.go @@ -61,12 +61,16 @@ func (s piecePerResourcePiece) WriteTo(w io.Writer) (int64, error) { if s.mustIsComplete() { return ccw.WriteConsecutiveChunks(s.completedInstancePath(), w) } else { - return ccw.WriteConsecutiveChunks(s.incompleteDirPath()+"/", w) + return s.writeConsecutiveIncompleteChunks(ccw, w) } } return io.Copy(w, io.NewSectionReader(s, 0, s.mp.Length())) } +func (s piecePerResourcePiece) writeConsecutiveIncompleteChunks(ccw ConsecutiveChunkWriter, w io.Writer) (int64, error) { + return ccw.WriteConsecutiveChunks(s.incompleteDirPath()+"/", w) +} + // Returns if the piece is complete. Ok should be true, because we are the definitive source of // truth here. func (s piecePerResourcePiece) mustIsComplete() bool { @@ -87,7 +91,17 @@ func (s piecePerResourcePiece) Completion() Completion { func (s piecePerResourcePiece) MarkComplete() error { incompleteChunks := s.getChunks() - err := s.completed().Put(io.NewSectionReader(incompleteChunks, 0, s.mp.Length())) + r, w := io.Pipe() + go func() { + var err error + if ccw, ok := s.rp.(ConsecutiveChunkWriter); ok { + _, err = s.writeConsecutiveIncompleteChunks(ccw, w) + } else { + _, err = io.Copy(w, io.NewSectionReader(incompleteChunks, 0, s.mp.Length())) + } + w.CloseWithError(err) + }() + err := s.completed().Put(r) if err == nil { for _, c := range incompleteChunks { c.instance.Delete()