_ "github.com/anacrolix/envpprof"
"github.com/anacrolix/missinggo"
. "github.com/anacrolix/missinggo"
+ "github.com/anacrolix/missinggo/filecache"
"github.com/anacrolix/utp"
"github.com/bradfitz/iter"
"github.com/stretchr/testify/assert"
leecherDataDir, err := ioutil.TempDir("", "")
require.NoError(t, err)
defer os.RemoveAll(leecherDataDir)
- // cfg.TorrentDataOpener = func() TorrentDataOpener {
- // fc, err := filecache.NewCache(leecherDataDir)
- // require.NoError(t, err)
- // if ps.SetLeecherStorageCapacity {
- // fc.SetCapacity(ps.LeecherStorageCapacity)
- // }
- // store := pieceStore.New(fileCacheDataBackend.New(fc))
- // return func(mi *metainfo.Info) storage.I {
- // return store.OpenTorrentData(mi)
- // }
- // }()
+ fc, err := filecache.NewCache(leecherDataDir)
+ require.NoError(t, err)
+ if ps.SetLeecherStorageCapacity {
+ fc.SetCapacity(ps.LeecherStorageCapacity)
+ }
+ cfg.DefaultStorage = storage.NewPieceFileStorage(storage.FileCacheFileStore{fc})
leecher, err := NewClient(&cfg)
require.NoError(t, err)
defer leecher.Close()
}
}
-// func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool) {
-// fileCacheDir, err := ioutil.TempDir("", "")
-// require.NoError(t, err)
-// defer os.RemoveAll(fileCacheDir)
-// fileCache, err := filecache.NewCache(fileCacheDir)
-// require.NoError(t, err)
-// greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
-// defer os.RemoveAll(greetingDataTempDir)
-// filePieceStore := pieceStore.New(fileCacheDataBackend.New(fileCache))
-// greetingData := filePieceStore.OpenTorrentData(&greetingMetainfo.Info.Info)
-// written, err := greetingData.WriteAt([]byte(testutil.GreetingFileContents), 0)
-// require.Equal(t, len(testutil.GreetingFileContents), written)
-// require.NoError(t, err)
-// for i := 0; i < greetingMetainfo.Info.NumPieces(); i++ {
-// // p := greetingMetainfo.Info.Piece(i)
-// if alreadyCompleted {
-// err := greetingData.PieceCompleted(i)
-// assert.NoError(t, err)
-// }
-// }
-// cfg := TestingConfig
-// // TODO: Disable network option?
-// cfg.DisableTCP = true
-// cfg.DisableUTP = true
-// // cfg.DefaultStorage = filePieceStore
-// cl, err := NewClient(&cfg)
-// require.NoError(t, err)
-// defer cl.Close()
-// tt, err := cl.AddTorrent(greetingMetainfo)
-// require.NoError(t, err)
-// psrs := tt.PieceStateRuns()
-// assert.Len(t, psrs, 1)
-// assert.EqualValues(t, 3, psrs[0].Length)
-// assert.Equal(t, alreadyCompleted, psrs[0].Complete)
-// if alreadyCompleted {
-// r := tt.NewReader()
-// b, err := ioutil.ReadAll(r)
-// assert.NoError(t, err)
-// assert.EqualValues(t, testutil.GreetingFileContents, b)
-// }
-// }
-
-// func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
-// testAddTorrentPriorPieceCompletion(t, true)
-// }
-
-// func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
-// testAddTorrentPriorPieceCompletion(t, false)
-// }
+func writeTorrentData(ts storage.Torrent, info *metainfo.InfoEx, b []byte) {
+ for i := range iter.N(info.NumPieces()) {
+ n, err := ts.Piece(info.Piece(i)).WriteAt(b, 0)
+ b = b[n:]
+ log.Print(err)
+ }
+}
+
+func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool) {
+ fileCacheDir, err := ioutil.TempDir("", "")
+ require.NoError(t, err)
+ defer os.RemoveAll(fileCacheDir)
+ fileCache, err := filecache.NewCache(fileCacheDir)
+ require.NoError(t, err)
+ greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
+ defer os.RemoveAll(greetingDataTempDir)
+ filePieceStore := storage.NewPieceFileStorage(storage.FileCacheFileStore{fileCache})
+ greetingData, err := filePieceStore.OpenTorrent(&greetingMetainfo.Info)
+ require.NoError(t, err)
+ writeTorrentData(greetingData, &greetingMetainfo.Info, []byte(testutil.GreetingFileContents))
+ // require.Equal(t, len(testutil.GreetingFileContents), written)
+ // require.NoError(t, err)
+ for i := 0; i < greetingMetainfo.Info.NumPieces(); i++ {
+ p := greetingMetainfo.Info.Piece(i)
+ if alreadyCompleted {
+ err := greetingData.Piece(p).MarkComplete()
+ assert.NoError(t, err)
+ }
+ }
+ cfg := TestingConfig
+ // TODO: Disable network option?
+ cfg.DisableTCP = true
+ cfg.DisableUTP = true
+ cfg.DefaultStorage = filePieceStore
+ cl, err := NewClient(&cfg)
+ require.NoError(t, err)
+ defer cl.Close()
+ tt, err := cl.AddTorrent(greetingMetainfo)
+ require.NoError(t, err)
+ psrs := tt.PieceStateRuns()
+ assert.Len(t, psrs, 1)
+ assert.EqualValues(t, 3, psrs[0].Length)
+ assert.Equal(t, alreadyCompleted, psrs[0].Complete)
+ if alreadyCompleted {
+ r := tt.NewReader()
+ b, err := ioutil.ReadAll(r)
+ assert.NoError(t, err)
+ assert.EqualValues(t, testutil.GreetingFileContents, b)
+ }
+}
+
+func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
+ testAddTorrentPriorPieceCompletion(t, true)
+}
+
+func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
+ testAddTorrentPriorPieceCompletion(t, false)
+}
func TestAddMetainfoWithNodes(t *testing.T) {
cfg := TestingConfig
leecherDataDir, err := ioutil.TempDir("", "")
require.NoError(t, err)
defer os.RemoveAll(leecherDataDir)
- // cfg.TorrentDataOpener = func() TorrentDataOpener {
- // fc, err := filecache.NewCache(leecherDataDir)
- // require.NoError(t, err)
- // if ps.SetLeecherStorageCapacity {
- // fc.SetCapacity(ps.LeecherStorageCapacity)
- // }
- // store := pieceStore.New(fileCacheDataBackend.New(fc))
- // return func(mi *metainfo.Info) storage.I {
- // return store.OpenTorrentData(mi)
- // }
- // }()
+ fc, err := filecache.NewCache(leecherDataDir)
+ require.NoError(t, err)
+ if ps.SetLeecherStorageCapacity {
+ fc.SetCapacity(ps.LeecherStorageCapacity)
+ }
+ cfg.DefaultStorage = storage.NewPieceFileStorage(storage.FileCacheFileStore{fc})
cfg.DataDir = leecherDataDir
leecher, _ := NewClient(&cfg)
defer leecher.Close()
--- /dev/null
+package storage
+
+import (
+ "os"
+ "path"
+
+ "github.com/anacrolix/missinggo"
+ "github.com/anacrolix/missinggo/filecache"
+
+ "github.com/anacrolix/torrent/metainfo"
+)
+
+type FileStore interface {
+ OpenFile(path string, flags int) (File, error)
+ Stat(path string) (os.FileInfo, error)
+ Rename(from, to string) error
+}
+
+type File interface {
+ WriteAt([]byte, int64) (int, error)
+ ReadAt([]byte, int64) (int, error)
+ Close() error
+}
+
+type FileCacheFileStore struct {
+ *filecache.Cache
+}
+
+func (me FileCacheFileStore) OpenFile(p string, f int) (File, error) {
+ return me.Cache.OpenFile(p, f)
+}
+
+type pieceFileStorage struct {
+ fs FileStore
+}
+
+func NewPieceFileStorage(fs FileStore) I {
+ return &pieceFileStorage{
+ fs: fs,
+ }
+}
+
+type pieceFileTorrentStorage struct {
+ s *pieceFileStorage
+}
+
+func (me *pieceFileStorage) OpenTorrent(info *metainfo.InfoEx) (Torrent, error) {
+ return &pieceFileTorrentStorage{me}, nil
+}
+
+func (me *pieceFileTorrentStorage) Close() error {
+ return nil
+}
+
+func (me *pieceFileTorrentStorage) Piece(p metainfo.Piece) Piece {
+ return pieceFileTorrentStoragePiece{me, p, me.s.fs}
+}
+
+type pieceFileTorrentStoragePiece struct {
+ ts *pieceFileTorrentStorage
+ p metainfo.Piece
+ fs FileStore
+}
+
+func (me pieceFileTorrentStoragePiece) completedPath() string {
+ return path.Join("completed", me.p.Hash().HexString())
+}
+
+func (me pieceFileTorrentStoragePiece) incompletePath() string {
+ return path.Join("incomplete", me.p.Hash().HexString())
+}
+
+func (me pieceFileTorrentStoragePiece) GetIsComplete() bool {
+ fi, err := me.ts.s.fs.Stat(me.completedPath())
+ return err == nil && fi.Size() == me.p.Length()
+}
+
+func (me pieceFileTorrentStoragePiece) MarkComplete() error {
+ return me.fs.Rename(me.incompletePath(), me.completedPath())
+}
+
+func (me pieceFileTorrentStoragePiece) ReadAt(b []byte, off int64) (n int, err error) {
+ f, err := me.fs.OpenFile(me.completedPath(), os.O_RDONLY)
+ if err != nil {
+ f, err = me.fs.OpenFile(me.incompletePath(), os.O_RDONLY)
+ if err != nil {
+ return
+ }
+ }
+ defer f.Close()
+ return f.ReadAt(b, off)
+}
+
+func (me pieceFileTorrentStoragePiece) WriteAt(b []byte, off int64) (n int, err error) {
+ f, err := me.fs.OpenFile(me.incompletePath(), os.O_WRONLY|os.O_CREATE)
+ if err != nil {
+ return
+ }
+ defer f.Close()
+ missinggo.LimitLen(&b, me.p.Length()-off)
+ return f.WriteAt(b, off)
+}