]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Reimplement piece storage
authorMatt Joiner <anacrolix@gmail.com>
Tue, 29 Mar 2016 00:14:34 +0000 (11:14 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Tue, 29 Mar 2016 00:14:34 +0000 (11:14 +1100)
client_test.go
storage/piece_file.go [new file with mode: 0644]

index faded767ef37fddce923be879c8a7e16db9c2fb5..597b67b71a079b2b7eeb0fa42fc9a28b0db1f606 100644 (file)
@@ -19,6 +19,7 @@ import (
        _ "github.com/anacrolix/envpprof"
        "github.com/anacrolix/missinggo"
        . "github.com/anacrolix/missinggo"
+       "github.com/anacrolix/missinggo/filecache"
        "github.com/anacrolix/utp"
        "github.com/bradfitz/iter"
        "github.com/stretchr/testify/assert"
@@ -310,17 +311,12 @@ func testClientTransfer(t *testing.T, ps testClientTransferParams) {
        leecherDataDir, err := ioutil.TempDir("", "")
        require.NoError(t, err)
        defer os.RemoveAll(leecherDataDir)
-       // cfg.TorrentDataOpener = func() TorrentDataOpener {
-       //      fc, err := filecache.NewCache(leecherDataDir)
-       //      require.NoError(t, err)
-       //      if ps.SetLeecherStorageCapacity {
-       //              fc.SetCapacity(ps.LeecherStorageCapacity)
-       //      }
-       //      store := pieceStore.New(fileCacheDataBackend.New(fc))
-       //      return func(mi *metainfo.Info) storage.I {
-       //              return store.OpenTorrentData(mi)
-       //      }
-       // }()
+       fc, err := filecache.NewCache(leecherDataDir)
+       require.NoError(t, err)
+       if ps.SetLeecherStorageCapacity {
+               fc.SetCapacity(ps.LeecherStorageCapacity)
+       }
+       cfg.DefaultStorage = storage.NewPieceFileStorage(storage.FileCacheFileStore{fc})
        leecher, err := NewClient(&cfg)
        require.NoError(t, err)
        defer leecher.Close()
@@ -719,55 +715,64 @@ func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
        }
 }
 
-// func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool) {
-//     fileCacheDir, err := ioutil.TempDir("", "")
-//     require.NoError(t, err)
-//     defer os.RemoveAll(fileCacheDir)
-//     fileCache, err := filecache.NewCache(fileCacheDir)
-//     require.NoError(t, err)
-//     greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
-//     defer os.RemoveAll(greetingDataTempDir)
-//     filePieceStore := pieceStore.New(fileCacheDataBackend.New(fileCache))
-//     greetingData := filePieceStore.OpenTorrentData(&greetingMetainfo.Info.Info)
-//     written, err := greetingData.WriteAt([]byte(testutil.GreetingFileContents), 0)
-//     require.Equal(t, len(testutil.GreetingFileContents), written)
-//     require.NoError(t, err)
-//     for i := 0; i < greetingMetainfo.Info.NumPieces(); i++ {
-//             // p := greetingMetainfo.Info.Piece(i)
-//             if alreadyCompleted {
-//                     err := greetingData.PieceCompleted(i)
-//                     assert.NoError(t, err)
-//             }
-//     }
-//     cfg := TestingConfig
-//     // TODO: Disable network option?
-//     cfg.DisableTCP = true
-//     cfg.DisableUTP = true
-//     // cfg.DefaultStorage = filePieceStore
-//     cl, err := NewClient(&cfg)
-//     require.NoError(t, err)
-//     defer cl.Close()
-//     tt, err := cl.AddTorrent(greetingMetainfo)
-//     require.NoError(t, err)
-//     psrs := tt.PieceStateRuns()
-//     assert.Len(t, psrs, 1)
-//     assert.EqualValues(t, 3, psrs[0].Length)
-//     assert.Equal(t, alreadyCompleted, psrs[0].Complete)
-//     if alreadyCompleted {
-//             r := tt.NewReader()
-//             b, err := ioutil.ReadAll(r)
-//             assert.NoError(t, err)
-//             assert.EqualValues(t, testutil.GreetingFileContents, b)
-//     }
-// }
-
-// func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
-//     testAddTorrentPriorPieceCompletion(t, true)
-// }
-
-// func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
-//     testAddTorrentPriorPieceCompletion(t, false)
-// }
+func writeTorrentData(ts storage.Torrent, info *metainfo.InfoEx, b []byte) {
+       for i := range iter.N(info.NumPieces()) {
+               n, err := ts.Piece(info.Piece(i)).WriteAt(b, 0)
+               b = b[n:]
+               log.Print(err)
+       }
+}
+
+func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool) {
+       fileCacheDir, err := ioutil.TempDir("", "")
+       require.NoError(t, err)
+       defer os.RemoveAll(fileCacheDir)
+       fileCache, err := filecache.NewCache(fileCacheDir)
+       require.NoError(t, err)
+       greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
+       defer os.RemoveAll(greetingDataTempDir)
+       filePieceStore := storage.NewPieceFileStorage(storage.FileCacheFileStore{fileCache})
+       greetingData, err := filePieceStore.OpenTorrent(&greetingMetainfo.Info)
+       require.NoError(t, err)
+       writeTorrentData(greetingData, &greetingMetainfo.Info, []byte(testutil.GreetingFileContents))
+       // require.Equal(t, len(testutil.GreetingFileContents), written)
+       // require.NoError(t, err)
+       for i := 0; i < greetingMetainfo.Info.NumPieces(); i++ {
+               p := greetingMetainfo.Info.Piece(i)
+               if alreadyCompleted {
+                       err := greetingData.Piece(p).MarkComplete()
+                       assert.NoError(t, err)
+               }
+       }
+       cfg := TestingConfig
+       // TODO: Disable network option?
+       cfg.DisableTCP = true
+       cfg.DisableUTP = true
+       cfg.DefaultStorage = filePieceStore
+       cl, err := NewClient(&cfg)
+       require.NoError(t, err)
+       defer cl.Close()
+       tt, err := cl.AddTorrent(greetingMetainfo)
+       require.NoError(t, err)
+       psrs := tt.PieceStateRuns()
+       assert.Len(t, psrs, 1)
+       assert.EqualValues(t, 3, psrs[0].Length)
+       assert.Equal(t, alreadyCompleted, psrs[0].Complete)
+       if alreadyCompleted {
+               r := tt.NewReader()
+               b, err := ioutil.ReadAll(r)
+               assert.NoError(t, err)
+               assert.EqualValues(t, testutil.GreetingFileContents, b)
+       }
+}
+
+func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
+       testAddTorrentPriorPieceCompletion(t, true)
+}
+
+func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
+       testAddTorrentPriorPieceCompletion(t, false)
+}
 
 func TestAddMetainfoWithNodes(t *testing.T) {
        cfg := TestingConfig
@@ -812,17 +817,12 @@ func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
        leecherDataDir, err := ioutil.TempDir("", "")
        require.NoError(t, err)
        defer os.RemoveAll(leecherDataDir)
-       // cfg.TorrentDataOpener = func() TorrentDataOpener {
-       //      fc, err := filecache.NewCache(leecherDataDir)
-       //      require.NoError(t, err)
-       //      if ps.SetLeecherStorageCapacity {
-       //              fc.SetCapacity(ps.LeecherStorageCapacity)
-       //      }
-       //      store := pieceStore.New(fileCacheDataBackend.New(fc))
-       //      return func(mi *metainfo.Info) storage.I {
-       //              return store.OpenTorrentData(mi)
-       //      }
-       // }()
+       fc, err := filecache.NewCache(leecherDataDir)
+       require.NoError(t, err)
+       if ps.SetLeecherStorageCapacity {
+               fc.SetCapacity(ps.LeecherStorageCapacity)
+       }
+       cfg.DefaultStorage = storage.NewPieceFileStorage(storage.FileCacheFileStore{fc})
        cfg.DataDir = leecherDataDir
        leecher, _ := NewClient(&cfg)
        defer leecher.Close()
diff --git a/storage/piece_file.go b/storage/piece_file.go
new file mode 100644 (file)
index 0000000..0e32375
--- /dev/null
@@ -0,0 +1,102 @@
+package storage
+
+import (
+       "os"
+       "path"
+
+       "github.com/anacrolix/missinggo"
+       "github.com/anacrolix/missinggo/filecache"
+
+       "github.com/anacrolix/torrent/metainfo"
+)
+
+type FileStore interface {
+       OpenFile(path string, flags int) (File, error)
+       Stat(path string) (os.FileInfo, error)
+       Rename(from, to string) error
+}
+
+type File interface {
+       WriteAt([]byte, int64) (int, error)
+       ReadAt([]byte, int64) (int, error)
+       Close() error
+}
+
+type FileCacheFileStore struct {
+       *filecache.Cache
+}
+
+func (me FileCacheFileStore) OpenFile(p string, f int) (File, error) {
+       return me.Cache.OpenFile(p, f)
+}
+
+type pieceFileStorage struct {
+       fs FileStore
+}
+
+func NewPieceFileStorage(fs FileStore) I {
+       return &pieceFileStorage{
+               fs: fs,
+       }
+}
+
+type pieceFileTorrentStorage struct {
+       s *pieceFileStorage
+}
+
+func (me *pieceFileStorage) OpenTorrent(info *metainfo.InfoEx) (Torrent, error) {
+       return &pieceFileTorrentStorage{me}, nil
+}
+
+func (me *pieceFileTorrentStorage) Close() error {
+       return nil
+}
+
+func (me *pieceFileTorrentStorage) Piece(p metainfo.Piece) Piece {
+       return pieceFileTorrentStoragePiece{me, p, me.s.fs}
+}
+
+type pieceFileTorrentStoragePiece struct {
+       ts *pieceFileTorrentStorage
+       p  metainfo.Piece
+       fs FileStore
+}
+
+func (me pieceFileTorrentStoragePiece) completedPath() string {
+       return path.Join("completed", me.p.Hash().HexString())
+}
+
+func (me pieceFileTorrentStoragePiece) incompletePath() string {
+       return path.Join("incomplete", me.p.Hash().HexString())
+}
+
+func (me pieceFileTorrentStoragePiece) GetIsComplete() bool {
+       fi, err := me.ts.s.fs.Stat(me.completedPath())
+       return err == nil && fi.Size() == me.p.Length()
+}
+
+func (me pieceFileTorrentStoragePiece) MarkComplete() error {
+       return me.fs.Rename(me.incompletePath(), me.completedPath())
+}
+
+func (me pieceFileTorrentStoragePiece) ReadAt(b []byte, off int64) (n int, err error) {
+       f, err := me.fs.OpenFile(me.completedPath(), os.O_RDONLY)
+       if err != nil {
+               f, err = me.fs.OpenFile(me.incompletePath(), os.O_RDONLY)
+               if err != nil {
+                       return
+               }
+       }
+       defer f.Close()
+       return f.ReadAt(b, off)
+}
+
+func (me pieceFileTorrentStoragePiece) WriteAt(b []byte, off int64) (n int, err error) {
+       f, err := me.fs.OpenFile(me.incompletePath(), os.O_WRONLY|os.O_CREATE)
+       if err != nil {
+               return
+       }
+       defer f.Close()
+       missinggo.LimitLen(&b, me.p.Length()-off)
+       return f.WriteAt(b, off)
+}