From: Matt Joiner <anacrolix@gmail.com>
Date: Tue, 29 Mar 2016 00:14:34 +0000 (+1100)
Subject: Reimplement piece storage
X-Git-Tag: v1.0.0~811
X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=ee22446440d923182ea55b801fc0cffbf18c8bee;p=btrtrc.git

Reimplement piece storage
---

diff --git a/client_test.go b/client_test.go
index faded767..597b67b7 100644
--- a/client_test.go
+++ b/client_test.go
@@ -19,6 +19,7 @@ import (
 	_ "github.com/anacrolix/envpprof"
 	"github.com/anacrolix/missinggo"
 	. "github.com/anacrolix/missinggo"
+	"github.com/anacrolix/missinggo/filecache"
 	"github.com/anacrolix/utp"
 	"github.com/bradfitz/iter"
 	"github.com/stretchr/testify/assert"
@@ -310,17 +311,12 @@ func testClientTransfer(t *testing.T, ps testClientTransferParams) {
 	leecherDataDir, err := ioutil.TempDir("", "")
 	require.NoError(t, err)
 	defer os.RemoveAll(leecherDataDir)
-	// cfg.TorrentDataOpener = func() TorrentDataOpener {
-	// 	fc, err := filecache.NewCache(leecherDataDir)
-	// 	require.NoError(t, err)
-	// 	if ps.SetLeecherStorageCapacity {
-	// 		fc.SetCapacity(ps.LeecherStorageCapacity)
-	// 	}
-	// 	store := pieceStore.New(fileCacheDataBackend.New(fc))
-	// 	return func(mi *metainfo.Info) storage.I {
-	// 		return store.OpenTorrentData(mi)
-	// 	}
-	// }()
+	fc, err := filecache.NewCache(leecherDataDir)
+	require.NoError(t, err)
+	if ps.SetLeecherStorageCapacity {
+		fc.SetCapacity(ps.LeecherStorageCapacity)
+	}
+	cfg.DefaultStorage = storage.NewPieceFileStorage(storage.FileCacheFileStore{fc})
 	leecher, err := NewClient(&cfg)
 	require.NoError(t, err)
 	defer leecher.Close()
@@ -719,55 +715,64 @@ func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
 	}
 }
 
-// func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool) {
-// 	fileCacheDir, err := ioutil.TempDir("", "")
-// 	require.NoError(t, err)
-// 	defer os.RemoveAll(fileCacheDir)
-// 	fileCache, err := filecache.NewCache(fileCacheDir)
-// 	require.NoError(t, err)
-// 	greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
-// 	defer os.RemoveAll(greetingDataTempDir)
-// 	filePieceStore := pieceStore.New(fileCacheDataBackend.New(fileCache))
-// 	greetingData := filePieceStore.OpenTorrentData(&greetingMetainfo.Info.Info)
-// 	written, err := greetingData.WriteAt([]byte(testutil.GreetingFileContents), 0)
-// 	require.Equal(t, len(testutil.GreetingFileContents), written)
-// 	require.NoError(t, err)
-// 	for i := 0; i < greetingMetainfo.Info.NumPieces(); i++ {
-// 		// p := greetingMetainfo.Info.Piece(i)
-// 		if alreadyCompleted {
-// 			err := greetingData.PieceCompleted(i)
-// 			assert.NoError(t, err)
-// 		}
-// 	}
-// 	cfg := TestingConfig
-// 	// TODO: Disable network option?
-// 	cfg.DisableTCP = true
-// 	cfg.DisableUTP = true
-// 	// cfg.DefaultStorage = filePieceStore
-// 	cl, err := NewClient(&cfg)
-// 	require.NoError(t, err)
-// 	defer cl.Close()
-// 	tt, err := cl.AddTorrent(greetingMetainfo)
-// 	require.NoError(t, err)
-// 	psrs := tt.PieceStateRuns()
-// 	assert.Len(t, psrs, 1)
-// 	assert.EqualValues(t, 3, psrs[0].Length)
-// 	assert.Equal(t, alreadyCompleted, psrs[0].Complete)
-// 	if alreadyCompleted {
-// 		r := tt.NewReader()
-// 		b, err := ioutil.ReadAll(r)
-// 		assert.NoError(t, err)
-// 		assert.EqualValues(t, testutil.GreetingFileContents, b)
-// 	}
-// }
-
-// func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
-// 	testAddTorrentPriorPieceCompletion(t, true)
-// }
-
-// func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
-// 	testAddTorrentPriorPieceCompletion(t, false)
-// }
+func writeTorrentData(ts storage.Torrent, info *metainfo.InfoEx, b []byte) {
+	for i := range iter.N(info.NumPieces()) {
+		n, err := ts.Piece(info.Piece(i)).WriteAt(b, 0)
+		b = b[n:]
+		log.Print(err)
+	}
+}
+
+func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool) {
+	fileCacheDir, err := ioutil.TempDir("", "")
+	require.NoError(t, err)
+	defer os.RemoveAll(fileCacheDir)
+	fileCache, err := filecache.NewCache(fileCacheDir)
+	require.NoError(t, err)
+	greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
+	defer os.RemoveAll(greetingDataTempDir)
+	filePieceStore := storage.NewPieceFileStorage(storage.FileCacheFileStore{fileCache})
+	greetingData, err := filePieceStore.OpenTorrent(&greetingMetainfo.Info)
+	require.NoError(t, err)
+	writeTorrentData(greetingData, &greetingMetainfo.Info, []byte(testutil.GreetingFileContents))
+	// require.Equal(t, len(testutil.GreetingFileContents), written)
+	// require.NoError(t, err)
+	for i := 0; i < greetingMetainfo.Info.NumPieces(); i++ {
+		p := greetingMetainfo.Info.Piece(i)
+		if alreadyCompleted {
+			err := greetingData.Piece(p).MarkComplete()
+			assert.NoError(t, err)
+		}
+	}
+	cfg := TestingConfig
+	// TODO: Disable network option?
+	cfg.DisableTCP = true
+	cfg.DisableUTP = true
+	cfg.DefaultStorage = filePieceStore
+	cl, err := NewClient(&cfg)
+	require.NoError(t, err)
+	defer cl.Close()
+	tt, err := cl.AddTorrent(greetingMetainfo)
+	require.NoError(t, err)
+	psrs := tt.PieceStateRuns()
+	assert.Len(t, psrs, 1)
+	assert.EqualValues(t, 3, psrs[0].Length)
+	assert.Equal(t, alreadyCompleted, psrs[0].Complete)
+	if alreadyCompleted {
+		r := tt.NewReader()
+		b, err := ioutil.ReadAll(r)
+		assert.NoError(t, err)
+		assert.EqualValues(t, testutil.GreetingFileContents, b)
+	}
+}
+
+func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
+	testAddTorrentPriorPieceCompletion(t, true)
+}
+
+func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
+	testAddTorrentPriorPieceCompletion(t, false)
+}
 
 func TestAddMetainfoWithNodes(t *testing.T) {
 	cfg := TestingConfig
@@ -812,17 +817,12 @@ func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
 	leecherDataDir, err := ioutil.TempDir("", "")
 	require.NoError(t, err)
 	defer os.RemoveAll(leecherDataDir)
-	// cfg.TorrentDataOpener = func() TorrentDataOpener {
-	// 	fc, err := filecache.NewCache(leecherDataDir)
-	// 	require.NoError(t, err)
-	// 	if ps.SetLeecherStorageCapacity {
-	// 		fc.SetCapacity(ps.LeecherStorageCapacity)
-	// 	}
-	// 	store := pieceStore.New(fileCacheDataBackend.New(fc))
-	// 	return func(mi *metainfo.Info) storage.I {
-	// 		return store.OpenTorrentData(mi)
-	// 	}
-	// }()
+	fc, err := filecache.NewCache(leecherDataDir)
+	require.NoError(t, err)
+	if ps.SetLeecherStorageCapacity {
+		fc.SetCapacity(ps.LeecherStorageCapacity)
+	}
+	cfg.DefaultStorage = storage.NewPieceFileStorage(storage.FileCacheFileStore{fc})
 	cfg.DataDir = leecherDataDir
 	leecher, _ := NewClient(&cfg)
 	defer leecher.Close()
diff --git a/storage/piece_file.go b/storage/piece_file.go
new file mode 100644
index 00000000..0e323751
--- /dev/null
+++ b/storage/piece_file.go
@@ -0,0 +1,102 @@
+package storage
+
+import (
+	"os"
+	"path"
+
+	"github.com/anacrolix/missinggo"
+	"github.com/anacrolix/missinggo/filecache"
+
+	"github.com/anacrolix/torrent/metainfo"
+)
+
+type FileStore interface {
+	OpenFile(path string, flags int) (File, error)
+	Stat(path string) (os.FileInfo, error)
+	Rename(from, to string) error
+}
+
+type File interface {
+	WriteAt([]byte, int64) (int, error)
+	ReadAt([]byte, int64) (int, error)
+	Close() error
+}
+
+type FileCacheFileStore struct {
+	*filecache.Cache
+}
+
+func (me FileCacheFileStore) OpenFile(p string, f int) (File, error) {
+	return me.Cache.OpenFile(p, f)
+}
+
+type pieceFileStorage struct {
+	fs FileStore
+}
+
+func NewPieceFileStorage(fs FileStore) I {
+	return &pieceFileStorage{
+		fs: fs,
+	}
+}
+
+type pieceFileTorrentStorage struct {
+	s *pieceFileStorage
+}
+
+func (me *pieceFileStorage) OpenTorrent(info *metainfo.InfoEx) (Torrent, error) {
+	return &pieceFileTorrentStorage{me}, nil
+}
+
+func (me *pieceFileTorrentStorage) Close() error {
+	return nil
+}
+
+func (me *pieceFileTorrentStorage) Piece(p metainfo.Piece) Piece {
+	return pieceFileTorrentStoragePiece{me, p, me.s.fs}
+}
+
+type pieceFileTorrentStoragePiece struct {
+	ts *pieceFileTorrentStorage
+	p  metainfo.Piece
+	fs FileStore
+}
+
+func (me pieceFileTorrentStoragePiece) completedPath() string {
+	return path.Join("completed", me.p.Hash().HexString())
+}
+
+func (me pieceFileTorrentStoragePiece) incompletePath() string {
+	return path.Join("incomplete", me.p.Hash().HexString())
+}
+
+func (me pieceFileTorrentStoragePiece) GetIsComplete() bool {
+	fi, err := me.ts.s.fs.Stat(me.completedPath())
+	return err == nil && fi.Size() == me.p.Length()
+}
+
+func (me pieceFileTorrentStoragePiece) MarkComplete() error {
+	return me.fs.Rename(me.incompletePath(), me.completedPath())
+}
+
+func (me pieceFileTorrentStoragePiece) ReadAt(b []byte, off int64) (n int, err error) {
+	f, err := me.fs.OpenFile(me.completedPath(), os.O_RDONLY)
+	if err != nil {
+		f, err = me.fs.OpenFile(me.incompletePath(), os.O_RDONLY)
+		if err != nil {
+			return
+		}
+	}
+	defer f.Close()
+	return f.ReadAt(b, off)
+}
+
+func (me pieceFileTorrentStoragePiece) WriteAt(b []byte, off int64) (n int, err error) {
+	f, err := me.fs.OpenFile(me.incompletePath(), os.O_WRONLY|os.O_CREATE)
+	if err != nil {
+		return
+	}
+	defer f.Close()
+	missinggo.LimitLen(&b, me.p.Length()-off)
+	return f.WriteAt(b, off)
+}