From 72e54fb353128ef85f00ec7c2c94b8648f1eb889 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Thu, 9 Jan 2020 22:58:23 +1100 Subject: [PATCH] Resource per piece storage: Store incomplete chunks separately --- client_test.go | 2 +- storage/issue95_test.go | 2 +- storage/piece_resource.go | 123 +++++++++++++++++++++++++++++++------- 3 files changed, 102 insertions(+), 25 deletions(-) diff --git a/client_test.go b/client_test.go index 056d6740..372f4bcd 100644 --- a/client_test.go +++ b/client_test.go @@ -21,7 +21,7 @@ import ( "github.com/anacrolix/dht/v2" _ "github.com/anacrolix/envpprof" "github.com/anacrolix/missinggo" - "github.com/anacrolix/missinggo/filecache" + "github.com/anacrolix/missinggo/v2/filecache" "github.com/anacrolix/torrent/bencode" "github.com/anacrolix/torrent/internal/testutil" diff --git a/storage/issue95_test.go b/storage/issue95_test.go index d47410da..4762d80e 100644 --- a/storage/issue95_test.go +++ b/storage/issue95_test.go @@ -5,7 +5,7 @@ import ( "os" "testing" - "github.com/anacrolix/missinggo/resource" + "github.com/anacrolix/missinggo/v2/resource" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" diff --git a/storage/piece_resource.go b/storage/piece_resource.go index 1064fae5..e57e8a89 100644 --- a/storage/piece_resource.go +++ b/storage/piece_resource.go @@ -1,9 +1,13 @@ package storage import ( + "bytes" + "io" "path" + "sort" + "strconv" - "github.com/anacrolix/missinggo/resource" + "github.com/anacrolix/missinggo/v2/resource" "github.com/anacrolix/torrent/metainfo" ) @@ -27,51 +31,124 @@ func (s *piecePerResource) Close() error { } func (s *piecePerResource) Piece(p metainfo.Piece) PieceImpl { - completed, err := s.p.NewInstance(path.Join("completed", p.Hash().HexString())) - if err != nil { - panic(err) - } - incomplete, err := s.p.NewInstance(path.Join("incomplete", p.Hash().HexString())) - if err != nil { - panic(err) - } return piecePerResourcePiece{ - p: p, - c: completed, - i: incomplete, + mp: p, + rp: s.p, } } type piecePerResourcePiece struct { - p metainfo.Piece - c resource.Instance - i resource.Instance + mp metainfo.Piece + rp resource.Provider } func (s piecePerResourcePiece) Completion() Completion { - fi, err := s.c.Stat() + fi, err := s.completed().Stat() return Completion{ - Complete: err == nil && fi.Size() == s.p.Length(), + Complete: err == nil && fi.Size() == s.mp.Length(), Ok: true, } } func (s piecePerResourcePiece) MarkComplete() error { - return resource.Move(s.i, s.c) + incompleteChunks := s.getChunks() + err := s.completed().Put(io.NewSectionReader(incompleteChunks, 0, s.mp.Length())) + if err == nil { + for _, c := range incompleteChunks { + c.instance.Delete() + } + } + return err } func (s piecePerResourcePiece) MarkNotComplete() error { - return s.c.Delete() + return s.completed().Delete() } func (s piecePerResourcePiece) ReadAt(b []byte, off int64) (int, error) { if s.Completion().Complete { - return s.c.ReadAt(b, off) - } else { - return s.i.ReadAt(b, off) + return s.completed().ReadAt(b, off) } + return s.getChunks().ReadAt(b, off) } func (s piecePerResourcePiece) WriteAt(b []byte, off int64) (n int, err error) { - return s.i.WriteAt(b, off) + i, err := s.rp.NewInstance(path.Join(s.incompleteDirPath(), strconv.FormatInt(off, 10))) + if err != nil { + panic(err) + } + r := bytes.NewReader(b) + err = i.Put(r) + n = len(b) - r.Len() + return +} + +type chunk struct { + offset int64 + instance resource.Instance +} + +type chunks []chunk + +func (me chunks) ReadAt(b []byte, off int64) (int, error) { + for { + if len(me) == 0 { + return 0, io.EOF + } + if me[0].offset <= off { + break + } + me = me[1:] + } + n, err := me[0].instance.ReadAt(b, off-me[0].offset) + if n == len(b) { + return n, nil + } + if err == nil || err == io.EOF { + n_, err := me[1:].ReadAt(b[n:], off+int64(n)) + return n + n_, err + } + return n, err +} + +func (s piecePerResourcePiece) getChunks() (chunks chunks) { + names, err := s.incompleteDir().Readdirnames() + if err != nil { + return + } + for _, n := range names { + offset, err := strconv.ParseInt(n, 10, 64) + if err != nil { + continue + } + i, err := s.rp.NewInstance(path.Join(s.incompleteDirPath(), n)) + if err != nil { + panic(err) + } + chunks = append(chunks, chunk{offset, i}) + } + sort.Slice(chunks, func(i, j int) bool { + return chunks[i].offset < chunks[j].offset + }) + return +} + +func (s piecePerResourcePiece) completed() resource.Instance { + i, err := s.rp.NewInstance(path.Join("completed", s.mp.Hash().HexString())) + if err != nil { + panic(err) + } + return i +} + +func (s piecePerResourcePiece) incompleteDirPath() string { + return path.Join("incompleted", s.mp.Hash().HexString()) +} + +func (s piecePerResourcePiece) incompleteDir() resource.DirInstance { + i, err := s.rp.NewInstance(s.incompleteDirPath()) + if err != nil { + panic(err) + } + return i.(resource.DirInstance) } -- 2.48.1