]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Resource per piece storage: Store incomplete chunks separately
authorMatt Joiner <anacrolix@gmail.com>
Thu, 9 Jan 2020 11:58:23 +0000 (22:58 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 13 Jan 2020 23:51:09 +0000 (10:51 +1100)
client_test.go
storage/issue95_test.go
storage/piece_resource.go

index 056d674080daaba587eb4fc77daa6a5db725e7aa..372f4bcdc09091a9fcb4a82f1f9b324dd9677e30 100644 (file)
@@ -21,7 +21,7 @@ import (
        "github.com/anacrolix/dht/v2"
        _ "github.com/anacrolix/envpprof"
        "github.com/anacrolix/missinggo"
-       "github.com/anacrolix/missinggo/filecache"
+       "github.com/anacrolix/missinggo/v2/filecache"
 
        "github.com/anacrolix/torrent/bencode"
        "github.com/anacrolix/torrent/internal/testutil"
index d47410dae3c287c56c9b2d6bb66a4f309f75cc3a..4762d80e105b58d3c34a740320fc162d21c537f0 100644 (file)
@@ -5,7 +5,7 @@ import (
        "os"
        "testing"
 
-       "github.com/anacrolix/missinggo/resource"
+       "github.com/anacrolix/missinggo/v2/resource"
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
 
index 1064fae557e9c4559a13253c751810c13f1b839b..e57e8a8940c78b819accb9ad2bafce230d23fd84 100644 (file)
@@ -1,9 +1,13 @@
 package storage
 
 import (
+       "bytes"
+       "io"
        "path"
+       "sort"
+       "strconv"
 
-       "github.com/anacrolix/missinggo/resource"
+       "github.com/anacrolix/missinggo/v2/resource"
 
        "github.com/anacrolix/torrent/metainfo"
 )
@@ -27,51 +31,124 @@ func (s *piecePerResource) Close() error {
 }
 
 func (s *piecePerResource) Piece(p metainfo.Piece) PieceImpl {
-       completed, err := s.p.NewInstance(path.Join("completed", p.Hash().HexString()))
-       if err != nil {
-               panic(err)
-       }
-       incomplete, err := s.p.NewInstance(path.Join("incomplete", p.Hash().HexString()))
-       if err != nil {
-               panic(err)
-       }
        return piecePerResourcePiece{
-               p: p,
-               c: completed,
-               i: incomplete,
+               mp: p,
+               rp: s.p,
        }
 }
 
 type piecePerResourcePiece struct {
-       p metainfo.Piece
-       c resource.Instance
-       i resource.Instance
+       mp metainfo.Piece
+       rp resource.Provider
 }
 
 func (s piecePerResourcePiece) Completion() Completion {
-       fi, err := s.c.Stat()
+       fi, err := s.completed().Stat()
        return Completion{
-               Complete: err == nil && fi.Size() == s.p.Length(),
+               Complete: err == nil && fi.Size() == s.mp.Length(),
                Ok:       true,
        }
 }
 
 func (s piecePerResourcePiece) MarkComplete() error {
-       return resource.Move(s.i, s.c)
+       incompleteChunks := s.getChunks()
+       err := s.completed().Put(io.NewSectionReader(incompleteChunks, 0, s.mp.Length()))
+       if err == nil {
+               for _, c := range incompleteChunks {
+                       c.instance.Delete()
+               }
+       }
+       return err
 }
 
 func (s piecePerResourcePiece) MarkNotComplete() error {
-       return s.c.Delete()
+       return s.completed().Delete()
 }
 
 func (s piecePerResourcePiece) ReadAt(b []byte, off int64) (int, error) {
        if s.Completion().Complete {
-               return s.c.ReadAt(b, off)
-       } else {
-               return s.i.ReadAt(b, off)
+               return s.completed().ReadAt(b, off)
        }
+       return s.getChunks().ReadAt(b, off)
 }
 
 func (s piecePerResourcePiece) WriteAt(b []byte, off int64) (n int, err error) {
-       return s.i.WriteAt(b, off)
+       i, err := s.rp.NewInstance(path.Join(s.incompleteDirPath(), strconv.FormatInt(off, 10)))
+       if err != nil {
+               panic(err)
+       }
+       r := bytes.NewReader(b)
+       err = i.Put(r)
+       n = len(b) - r.Len()
+       return
+}
+
+type chunk struct {
+       offset   int64
+       instance resource.Instance
+}
+
+type chunks []chunk
+
+func (me chunks) ReadAt(b []byte, off int64) (int, error) {
+       for {
+               if len(me) == 0 {
+                       return 0, io.EOF
+               }
+               if me[0].offset <= off {
+                       break
+               }
+               me = me[1:]
+       }
+       n, err := me[0].instance.ReadAt(b, off-me[0].offset)
+       if n == len(b) {
+               return n, nil
+       }
+       if err == nil || err == io.EOF {
+               n_, err := me[1:].ReadAt(b[n:], off+int64(n))
+               return n + n_, err
+       }
+       return n, err
+}
+
+func (s piecePerResourcePiece) getChunks() (chunks chunks) {
+       names, err := s.incompleteDir().Readdirnames()
+       if err != nil {
+               return
+       }
+       for _, n := range names {
+               offset, err := strconv.ParseInt(n, 10, 64)
+               if err != nil {
+                       continue
+               }
+               i, err := s.rp.NewInstance(path.Join(s.incompleteDirPath(), n))
+               if err != nil {
+                       panic(err)
+               }
+               chunks = append(chunks, chunk{offset, i})
+       }
+       sort.Slice(chunks, func(i, j int) bool {
+               return chunks[i].offset < chunks[j].offset
+       })
+       return
+}
+
+func (s piecePerResourcePiece) completed() resource.Instance {
+       i, err := s.rp.NewInstance(path.Join("completed", s.mp.Hash().HexString()))
+       if err != nil {
+               panic(err)
+       }
+       return i
+}
+
+func (s piecePerResourcePiece) incompleteDirPath() string {
+       return path.Join("incompleted", s.mp.Hash().HexString())
+}
+
+func (s piecePerResourcePiece) incompleteDir() resource.DirInstance {
+       i, err := s.rp.NewInstance(s.incompleteDirPath())
+       if err != nil {
+               panic(err)
+       }
+       return i.(resource.DirInstance)
 }