]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Create a pieceStore interface, and merge in my httpfile backend, and replace data...
authorMatt Joiner <anacrolix@gmail.com>
Sat, 3 Oct 2015 14:22:46 +0000 (00:22 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Sat, 3 Oct 2015 14:22:46 +0000 (00:22 +1000)
data/blob was aging, and had severe performance problems. It's now possible to use missinggo/filecache as a data backend to pieceStore which is better tested and performs excellently.

client_test.go
data.go
data/blob/store.go [deleted file]
data/file/file.go
data/pieceStore/blob.go [moved from data/blob/blob.go with 70% similarity]
data/pieceStore/dataBackend/fileCache/backend.go [new file with mode: 0644]
data/pieceStore/dataBackend/http/backend.go [new file with mode: 0644]
data/pieceStore/dataBackend/i.go [new file with mode: 0644]
data/pieceStore/store.go [new file with mode: 0644]
torrent.go

index a0f0f2a19bedeaa3b9dd4e4dc9cf7d076213fc62..d8707a9f6dbcbfff1c73849af9e9f924db0da800 100644 (file)
@@ -16,13 +16,15 @@ import (
        _ "github.com/anacrolix/envpprof"
        "github.com/anacrolix/missinggo"
        . "github.com/anacrolix/missinggo"
+       "github.com/anacrolix/missinggo/filecache"
        "github.com/anacrolix/utp"
        "github.com/bradfitz/iter"
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
 
        "github.com/anacrolix/torrent/bencode"
-       "github.com/anacrolix/torrent/data/blob"
+       "github.com/anacrolix/torrent/data/pieceStore"
+       "github.com/anacrolix/torrent/data/pieceStore/dataBackend/fileCache"
        "github.com/anacrolix/torrent/dht"
        "github.com/anacrolix/torrent/internal/testutil"
        "github.com/anacrolix/torrent/iplist"
@@ -30,7 +32,7 @@ import (
 )
 
 func init() {
-       log.SetFlags(log.LstdFlags | log.Lshortfile)
+       log.SetFlags(log.LstdFlags | log.Llongfile)
 }
 
 var TestingConfig = Config{
@@ -266,10 +268,18 @@ func TestClientTransfer(t *testing.T) {
        // cfg.TorrentDataOpener = func(info *metainfo.Info) (data.Data, error) {
        //      return blob.TorrentData(info, leecherDataDir), nil
        // }
-       blobStore := blob.NewStore(leecherDataDir)
-       cfg.TorrentDataOpener = func(info *metainfo.Info) Data {
-               return blobStore.OpenTorrent(info)
-       }
+       // blobStore := blob.NewStore(leecherDataDir)
+       // cfg.TorrentDataOpener = func(info *metainfo.Info) Data {
+       //      return blobStore.OpenTorrent(info)
+       // }
+       cfg.TorrentDataOpener = func() TorrentDataOpener {
+               fc, err := filecache.NewCache(leecherDataDir)
+               require.NoError(t, err)
+               store := pieceStore.New(fileCacheDataBackend.New(fc))
+               return func(mi *metainfo.Info) Data {
+                       return store.OpenTorrentData(mi)
+               }
+       }()
        leecher, _ := NewClient(&cfg)
        defer leecher.Close()
        leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
diff --git a/data.go b/data.go
index 803d8476bbb0da4c1fc9bd38fdc462de7e9301af..9f8214fe6714cff2d1d9495767f104c77fd0a9ce 100644 (file)
--- a/data.go
+++ b/data.go
@@ -4,9 +4,11 @@ import "io"
 
 // Represents data storage for a Torrent.
 type Data interface {
-       ReadAt(p []byte, off int64) (n int, err error)
+       io.ReaderAt
+       io.WriterAt
+       // Bro, do you even io.Closer?
        Close()
-       WriteAt(p []byte, off int64) (n int, err error)
+       // If the data isn't available, err should be io.ErrUnexpectedEOF.
        WriteSectionTo(w io.Writer, off, n int64) (written int64, err error)
        // We believe the piece data will pass a hash check.
        PieceCompleted(index int) error
diff --git a/data/blob/store.go b/data/blob/store.go
deleted file mode 100644 (file)
index 99f696a..0000000
+++ /dev/null
@@ -1,277 +0,0 @@
-// Implements torrent data storage as per-piece files.
-package blob
-
-import (
-       "bytes"
-       "crypto/sha1"
-       "encoding/hex"
-       "errors"
-       "fmt"
-       "io"
-       "os"
-       "path/filepath"
-       "sort"
-       "sync"
-       "time"
-
-       "github.com/anacrolix/missinggo"
-
-       "github.com/anacrolix/torrent/metainfo"
-)
-
-const (
-       filePerm = 0640
-       dirPerm  = 0750
-)
-
-type store struct {
-       baseDir  string
-       capacity int64
-
-       mu        sync.Mutex
-       completed map[[20]byte]struct{}
-}
-
-func (me *store) OpenTorrent(info *metainfo.Info) *data {
-       return &data{info, me}
-}
-
-type StoreOption func(*store)
-
-func Capacity(bytes int64) StoreOption {
-       return func(s *store) {
-               s.capacity = bytes
-       }
-}
-
-func NewStore(baseDir string, opt ...StoreOption) *store {
-       s := &store{baseDir, -1, sync.Mutex{}, nil}
-       for _, o := range opt {
-               o(s)
-       }
-       s.initCompleted()
-       return s
-}
-
-// Turns 40 byte hex string into its equivalent binary byte array.
-func hexStringPieceHashArray(s string) (ret [20]byte, ok bool) {
-       if len(s) != 40 {
-               return
-       }
-       n, err := hex.Decode(ret[:], []byte(s))
-       if err != nil {
-               return
-       }
-       if n != 20 {
-               panic(n)
-       }
-       ok = true
-       return
-}
-
-func (me *store) initCompleted() {
-       fis, err := me.readCompletedDir()
-       if err != nil {
-               panic(err)
-       }
-       me.mu.Lock()
-       me.completed = make(map[[20]byte]struct{}, len(fis))
-       for _, fi := range fis {
-               binHash, ok := hexStringPieceHashArray(fi.Name())
-               if !ok {
-                       continue
-               }
-               me.completed[binHash] = struct{}{}
-       }
-       me.mu.Unlock()
-}
-
-func (me *store) completePieceDirPath() string {
-       return filepath.Join(me.baseDir, "completed")
-}
-
-func (me *store) path(p metainfo.Piece, completed bool) string {
-       return filepath.Join(me.baseDir, func() string {
-               if completed {
-                       return "completed"
-               } else {
-                       return "incomplete"
-               }
-       }(), fmt.Sprintf("%x", p.Hash()))
-}
-
-func sliceToPieceHashArray(b []byte) (ret [20]byte) {
-       n := copy(ret[:], b)
-       if n != 20 {
-               panic(n)
-       }
-       return
-}
-
-func (me *store) pieceComplete(p metainfo.Piece) bool {
-       me.mu.Lock()
-       defer me.mu.Unlock()
-       _, ok := me.completed[sliceToPieceHashArray(p.Hash())]
-       return ok
-}
-
-func (me *store) pieceWrite(p metainfo.Piece) (f *os.File) {
-       if me.pieceComplete(p) {
-               return
-       }
-       name := me.path(p, false)
-       os.MkdirAll(filepath.Dir(name), dirPerm)
-       f, err := os.OpenFile(name, os.O_CREATE|os.O_WRONLY, filePerm)
-       if err != nil {
-               panic(err)
-       }
-       return
-}
-
-// Returns the file for the given piece, if it exists. It could be completed,
-// or incomplete.
-func (me *store) pieceRead(p metainfo.Piece) (f *os.File) {
-       f, err := os.Open(me.path(p, true))
-       if err == nil {
-               return
-       }
-       if !os.IsNotExist(err) {
-               panic(err)
-       }
-       // Mark the file not completed, in case we thought it was. TODO: Trigger
-       // an asynchronous initCompleted to reinitialize the entire completed map
-       // as there are likely other files missing.
-       me.mu.Lock()
-       delete(me.completed, sliceToPieceHashArray(p.Hash()))
-       me.mu.Unlock()
-       f, err = os.Open(me.path(p, false))
-       if err == nil {
-               return
-       }
-       if !os.IsNotExist(err) {
-               panic(err)
-       }
-       return
-}
-
-func (me *store) readCompletedDir() (fis []os.FileInfo, err error) {
-       f, err := os.Open(me.completePieceDirPath())
-       if err != nil {
-               if os.IsNotExist(err) {
-                       err = nil
-               }
-               return
-       }
-       fis, err = f.Readdir(-1)
-       f.Close()
-       return
-}
-
-func (me *store) removeCompleted(name string) (err error) {
-       err = os.Remove(filepath.Join(me.completePieceDirPath(), name))
-       if os.IsNotExist(err) {
-               err = nil
-       }
-       if err != nil {
-               return err
-       }
-       binHash, ok := hexStringPieceHashArray(name)
-       if ok {
-               me.mu.Lock()
-               delete(me.completed, binHash)
-               me.mu.Unlock()
-       }
-       return
-}
-
-type fileInfoSorter struct {
-       fis []os.FileInfo
-}
-
-func (me fileInfoSorter) Len() int {
-       return len(me.fis)
-}
-
-func lastTime(fi os.FileInfo) (ret time.Time) {
-       ret = fi.ModTime()
-       atime := missinggo.FileInfoAccessTime(fi)
-       if atime.After(ret) {
-               ret = atime
-       }
-       return
-}
-
-func (me fileInfoSorter) Less(i, j int) bool {
-       return lastTime(me.fis[i]).Before(lastTime(me.fis[j]))
-}
-
-func (me fileInfoSorter) Swap(i, j int) {
-       me.fis[i], me.fis[j] = me.fis[j], me.fis[i]
-}
-
-func sortFileInfos(fis []os.FileInfo) {
-       sorter := fileInfoSorter{fis}
-       sort.Sort(sorter)
-}
-
-func (me *store) makeSpace(space int64) error {
-       if me.capacity < 0 {
-               return nil
-       }
-       if space > me.capacity {
-               return errors.New("space requested exceeds capacity")
-       }
-       fis, err := me.readCompletedDir()
-       if err != nil {
-               return err
-       }
-       var size int64
-       for _, fi := range fis {
-               size += fi.Size()
-       }
-       sortFileInfos(fis)
-       for size > me.capacity-space {
-               me.removeCompleted(fis[0].Name())
-               size -= fis[0].Size()
-               fis = fis[1:]
-       }
-       return nil
-}
-
-func (me *store) PieceCompleted(p metainfo.Piece) (err error) {
-       err = me.makeSpace(p.Length())
-       if err != nil {
-               return
-       }
-       var (
-               incompletePiecePath = me.path(p, false)
-               completedPiecePath  = me.path(p, true)
-       )
-       fSrc, err := os.Open(incompletePiecePath)
-       if err != nil {
-               return
-       }
-       defer fSrc.Close()
-       os.MkdirAll(filepath.Dir(completedPiecePath), dirPerm)
-       fDst, err := os.OpenFile(completedPiecePath, os.O_EXCL|os.O_CREATE|os.O_WRONLY, filePerm)
-       if err != nil {
-               return
-       }
-       defer fDst.Close()
-       hasher := sha1.New()
-       r := io.TeeReader(io.LimitReader(fSrc, p.Length()), hasher)
-       _, err = io.Copy(fDst, r)
-       if err != nil {
-               return
-       }
-       if !bytes.Equal(hasher.Sum(nil), p.Hash()) {
-               err = errors.New("piece incomplete")
-               os.Remove(completedPiecePath)
-               return
-       }
-       os.Remove(incompletePiecePath)
-       me.mu.Lock()
-       me.completed[sliceToPieceHashArray(p.Hash())] = struct{}{}
-       me.mu.Unlock()
-       return
-}
index bf77db2adf5ffcbdf800425bfe956f6ee2e2c87b..a24f4ebef2ad2934671b3cd9ea1cca296d92b407 100644 (file)
@@ -103,6 +103,9 @@ func (me data) WriteSectionTo(w io.Writer, off, n int64) (written int64, err err
                }
                var f *os.File
                f, err = os.Open(me.fileInfoName(fi))
+               if os.IsNotExist(err) {
+                       err = io.ErrUnexpectedEOF
+               }
                if err != nil {
                        return
                }
similarity index 70%
rename from data/blob/blob.go
rename to data/pieceStore/blob.go
index 0afc047bc652343aeb3eed91af5401711e940479..0ebdea256531dc80423baed559829bd338fbf09c 100644 (file)
@@ -1,9 +1,8 @@
-package blob
+package pieceStore
 
 import (
        "encoding/hex"
        "io"
-       "log"
 
        "github.com/anacrolix/torrent/metainfo"
 )
@@ -19,6 +18,10 @@ func (me *data) pieceHashHex(i int) string {
 
 func (me *data) Close() {}
 
+// TODO: Make sure that reading completed can't read from incomplete. Then
+// also it'll be possible to verify that the Content-Range on completed
+// returns the correct piece length so there aren't short reads.
+
 func (me *data) ReadAt(b []byte, off int64) (n int, err error) {
        for len(b) != 0 {
                if off >= me.info.TotalLength() {
@@ -26,27 +29,16 @@ func (me *data) ReadAt(b []byte, off int64) (n int, err error) {
                        break
                }
                p := me.info.Piece(int(off / me.info.PieceLength))
-               f := me.store.pieceRead(p)
-               if f == nil {
-                       log.Println("piece not found", p)
-                       err = io.ErrUnexpectedEOF
-                       break
-               }
                b1 := b
                maxN1 := int(p.Length() - off%me.info.PieceLength)
                if len(b1) > maxN1 {
                        b1 = b1[:maxN1]
                }
                var n1 int
-               n1, err = f.ReadAt(b1, off%me.info.PieceLength)
-               f.Close()
+               n1, err = me.store.pieceReadAt(p, b1, off%me.info.PieceLength)
                n += n1
                off += int64(n1)
                b = b[n1:]
-               if err == io.EOF {
-                       err = nil
-                       break
-               }
                if err != nil {
                        break
                }
@@ -54,19 +46,18 @@ func (me *data) ReadAt(b []byte, off int64) (n int, err error) {
        return
 }
 
+// TODO: Rewrite this later, on short writes to a piece it will start to play up.
 func (me *data) WriteAt(p []byte, off int64) (n int, err error) {
        i := int(off / me.info.PieceLength)
        off %= me.info.PieceLength
        for len(p) != 0 {
-               f := me.store.pieceWrite(me.info.Piece(i))
                p1 := p
                maxN := me.info.Piece(i).Length() - off
                if int64(len(p1)) > maxN {
                        p1 = p1[:maxN]
                }
                var n1 int
-               n1, err = f.WriteAt(p1, off)
-               f.Close()
+               n1, err = me.store.pieceWriteAt(me.info.Piece(i), p1, off)
                n += n1
                if err != nil {
                        return
@@ -78,27 +69,23 @@ func (me *data) WriteAt(p []byte, off int64) (n int, err error) {
        return
 }
 
-func (me *data) pieceReader(piece int, off int64) (ret io.ReadCloser, err error) {
-       f := me.store.pieceRead(me.info.Piece(piece))
-       if f == nil {
-               err = io.ErrUnexpectedEOF
-               return
-       }
-       return struct {
-               io.Reader
-               io.Closer
-       }{
-               Reader: io.NewSectionReader(f, off, me.info.Piece(piece).Length()-off),
-               Closer: f,
-       }, nil
+func (me *data) pieceReader(p metainfo.Piece, off int64) (ret io.ReadCloser, err error) {
+       return me.store.getPieceRange(p, off, p.Length()-off)
 }
 
 func (me *data) WriteSectionTo(w io.Writer, off, n int64) (written int64, err error) {
        i := int(off / me.info.PieceLength)
        off %= me.info.PieceLength
        for n != 0 {
+               if i >= me.info.NumPieces() {
+                       break
+               }
+               p := me.info.Piece(i)
+               if off >= p.Length() {
+                       break
+               }
                var pr io.ReadCloser
-               pr, err = me.pieceReader(i, off)
+               pr, err = me.pieceReader(p, off)
                if err != nil {
                        return
                }
diff --git a/data/pieceStore/dataBackend/fileCache/backend.go b/data/pieceStore/dataBackend/fileCache/backend.go
new file mode 100644 (file)
index 0000000..701ae4a
--- /dev/null
@@ -0,0 +1,63 @@
+package fileCacheDataBackend
+
+import (
+       "io"
+       "os"
+
+       "github.com/anacrolix/missinggo/filecache"
+
+       "github.com/anacrolix/torrent/data/pieceStore/dataBackend"
+)
+
+type backend struct {
+       c *filecache.Cache
+}
+
+func New(fc *filecache.Cache) *backend {
+       return &backend{
+               c: fc,
+       }
+}
+
+var _ dataBackend.I = &backend{}
+
+func (me *backend) Delete(path string) (err error) {
+       err = me.c.Remove(path)
+       return
+}
+
+func (me *backend) GetLength(path string) (ret int64, err error) {
+       f, err := me.c.OpenFile(path, 0)
+       if os.IsNotExist(err) {
+               err = dataBackend.ErrNotFound
+       }
+       if err != nil {
+               return
+       }
+       defer f.Close()
+       ret, err = f.Seek(0, os.SEEK_END)
+       return
+}
+
+func (me *backend) Open(path string) (ret dataBackend.File, err error) {
+       ret, err = me.c.OpenFile(path, os.O_RDWR|os.O_CREATE)
+       return
+}
+
+func (me *backend) OpenSection(path string, off, n int64) (ret io.ReadCloser, err error) {
+       f, err := me.c.OpenFile(path, os.O_RDONLY)
+       if os.IsNotExist(err) {
+               err = dataBackend.ErrNotFound
+       }
+       if err != nil {
+               return
+       }
+       ret = struct {
+               io.Reader
+               io.Closer
+       }{
+               io.NewSectionReader(f, off, n),
+               f,
+       }
+       return
+}
diff --git a/data/pieceStore/dataBackend/http/backend.go b/data/pieceStore/dataBackend/http/backend.go
new file mode 100644 (file)
index 0000000..85fc784
--- /dev/null
@@ -0,0 +1,65 @@
+package httpDataBackend
+
+import (
+       "io"
+       "net/http"
+       "net/url"
+       "path"
+
+       "github.com/anacrolix/missinggo"
+       "github.com/anacrolix/missinggo/httpfile"
+
+       "github.com/anacrolix/torrent/data/pieceStore/dataBackend"
+)
+
+var client = http.DefaultClient
+
+type backend struct {
+       // Backend URL.
+       url url.URL
+}
+
+func New(u url.URL) *backend {
+       return &backend{
+               url: *missinggo.CopyURL(&u),
+       }
+}
+
+var _ dataBackend.I = &backend{}
+
+func fixErrNotFound(err error) error {
+       if err == httpfile.ErrNotFound {
+               return dataBackend.ErrNotFound
+       }
+       return err
+}
+
+func (me *backend) urlStr(_path string) string {
+       u := me.url
+       u.Path = path.Join(u.Path, _path)
+       return u.String()
+}
+
+func (me *backend) Delete(path string) (err error) {
+       err = httpfile.Delete(me.urlStr(path))
+       err = fixErrNotFound(err)
+       return
+}
+
+func (me *backend) GetLength(path string) (ret int64, err error) {
+       ret, err = httpfile.GetLength(me.urlStr(path))
+       err = fixErrNotFound(err)
+       return
+}
+
+func (me *backend) Open(path string) (ret dataBackend.File, err error) {
+       ret = httpfile.Open(me.urlStr(path))
+       err = fixErrNotFound(err)
+       return
+}
+
+func (me *backend) OpenSection(path string, off, n int64) (ret io.ReadCloser, err error) {
+       ret, err = httpfile.OpenSectionReader(me.urlStr(path), off, n)
+       err = fixErrNotFound(err)
+       return
+}
diff --git a/data/pieceStore/dataBackend/i.go b/data/pieceStore/dataBackend/i.go
new file mode 100644 (file)
index 0000000..6b5beb9
--- /dev/null
@@ -0,0 +1,22 @@
+package dataBackend
+
+import (
+       "errors"
+       "io"
+)
+
+// All functions must return ErrNotFound as required.
+type I interface {
+       GetLength(path string) (int64, error)
+       Open(path string) (File, error)
+       OpenSection(path string, off, n int64) (io.ReadCloser, error)
+       Delete(path string) error
+}
+
+var ErrNotFound = errors.New("not found")
+
+type File interface {
+       io.Closer
+       io.Seeker
+       io.Writer
+}
diff --git a/data/pieceStore/store.go b/data/pieceStore/store.go
new file mode 100644 (file)
index 0000000..3b1970f
--- /dev/null
@@ -0,0 +1,292 @@
+package pieceStore
+
+import (
+       "bytes"
+       "crypto/sha1"
+       "encoding/hex"
+       "errors"
+       "io"
+       "log"
+       "os"
+       "path"
+       "strconv"
+       "sync"
+       "time"
+
+       "github.com/anacrolix/torrent/data/pieceStore/dataBackend"
+       "github.com/anacrolix/torrent/metainfo"
+)
+
+type store struct {
+       db dataBackend.I
+       // Limit backend requests.
+       requestPool chan struct{}
+
+       mu sync.Mutex
+       // The cached completion state for pieces.
+       completion map[[20]byte]bool
+       lastError  time.Time
+}
+
+func (me *store) completedPiecePath(p metainfo.Piece) string {
+       return path.Join("completed", hex.EncodeToString(p.Hash()))
+}
+
+func (me *store) incompletePiecePath(p metainfo.Piece) string {
+       return path.Join(
+               "incomplete",
+               strconv.FormatInt(int64(os.Getpid()), 10),
+               hex.EncodeToString(p.Hash()))
+}
+
+func (me *store) OpenTorrentData(info *metainfo.Info) *data {
+       return &data{info, me}
+}
+
+func New(db dataBackend.I) *store {
+       s := &store{
+               db:          db,
+               requestPool: make(chan struct{}, 5),
+       }
+       return s
+}
+
+// Turns 40 byte hex string into its equivalent binary byte array.
+func hexStringPieceHashArray(s string) (ret [20]byte, ok bool) {
+       if len(s) != 40 {
+               return
+       }
+       n, err := hex.Decode(ret[:], []byte(s))
+       if err != nil {
+               return
+       }
+       if n != 20 {
+               panic(n)
+       }
+       ok = true
+       return
+}
+
+func sliceToPieceHashArray(b []byte) (ret [20]byte) {
+       n := copy(ret[:], b)
+       if n != 20 {
+               panic(n)
+       }
+       return
+}
+
+func pieceHashArray(p metainfo.Piece) [20]byte {
+       return sliceToPieceHashArray(p.Hash())
+}
+
+func (me *store) completionKnown(p metainfo.Piece) bool {
+       me.mu.Lock()
+       _, ok := me.completion[pieceHashArray(p)]
+       me.mu.Unlock()
+       return ok
+}
+
+func (me *store) isComplete(p metainfo.Piece) bool {
+       me.mu.Lock()
+       ret, _ := me.completion[pieceHashArray(p)]
+       me.mu.Unlock()
+       return ret
+}
+
+func (me *store) setCompletion(p metainfo.Piece, complete bool) {
+       me.mu.Lock()
+       if me.completion == nil {
+               me.completion = make(map[[20]byte]bool)
+       }
+       me.completion[pieceHashArray(p)] = complete
+       me.mu.Unlock()
+}
+
+func (me *store) pieceComplete(p metainfo.Piece) bool {
+       if me.completionKnown(p) {
+               return me.isComplete(p)
+       }
+       // Prevent a errors from stalling the caller.
+       if !me.lastError.IsZero() && time.Since(me.lastError) < time.Second {
+               return false
+       }
+       me.requestPool <- struct{}{}
+       defer func() {
+               <-me.requestPool
+       }()
+       length, err := me.db.GetLength(me.completedPiecePath(p))
+       if err == dataBackend.ErrNotFound {
+               me.setCompletion(p, false)
+               return false
+       }
+       if err != nil {
+               me.lastError = time.Now()
+               log.Printf("%+v", err)
+               return false
+       }
+       complete := length == p.Length()
+       if !complete {
+               log.Printf("completed piece %x has wrong length: %d", p.Hash(), length)
+       }
+       me.setCompletion(p, complete)
+       return complete
+}
+
+func (me *store) pieceWriteAt(p metainfo.Piece, b []byte, off int64) (n int, err error) {
+       if me.pieceComplete(p) {
+               err = errors.New("already have piece")
+               return
+       }
+       me.requestPool <- struct{}{}
+       defer func() {
+               <-me.requestPool
+       }()
+       f, err := me.db.Open(me.incompletePiecePath(p))
+       if err != nil {
+               return
+       }
+       defer func() {
+               closeErr := f.Close()
+               if err == nil {
+                       err = closeErr
+               }
+       }()
+       _, err = f.Seek(off, os.SEEK_SET)
+       if err != nil {
+               return
+       }
+       n, err = f.Write(b)
+       return
+}
+
+// Wraps a Closer, releases a slot from a channel pool the first time Close is
+// called.
+type poolCloser struct {
+       mu       sync.Mutex
+       released bool
+       pool     <-chan struct{}
+       io.Closer
+}
+
+func (me *poolCloser) Close() (err error) {
+       err = me.Closer.Close()
+       me.mu.Lock()
+       if !me.released {
+               <-me.pool
+               me.released = true
+       }
+       me.mu.Unlock()
+       return
+}
+
+func (me *store) forgetCompletions() {
+       me.mu.Lock()
+       me.completion = nil
+       me.mu.Unlock()
+}
+
+func (me *store) getPieceRange(p metainfo.Piece, off, n int64) (ret io.ReadCloser, err error) {
+       me.requestPool <- struct{}{}
+       rc, err := me.db.OpenSection(me.completedPiecePath(p), off, n)
+       if err == dataBackend.ErrNotFound {
+               if me.isComplete(p) {
+                       me.forgetCompletions()
+               }
+               me.setCompletion(p, false)
+               rc, err = me.db.OpenSection(me.incompletePiecePath(p), off, n)
+       }
+       if err == dataBackend.ErrNotFound {
+               <-me.requestPool
+               err = io.ErrUnexpectedEOF
+               return
+       }
+       if err != nil {
+               <-me.requestPool
+               return
+       }
+       // Wrap up the response body so that the request slot is released when the
+       // response body is closed.
+       ret = struct {
+               io.Reader
+               io.Closer
+       }{
+               rc,
+               &poolCloser{
+                       pool:   me.requestPool,
+                       Closer: rc,
+               },
+       }
+       return
+}
+
+func (me *store) pieceReadAt(p metainfo.Piece, b []byte, off int64) (n int, err error) {
+       rc, err := me.getPieceRange(p, off, int64(len(b)))
+       if err != nil {
+               return
+       }
+       defer rc.Close()
+       n, err = io.ReadFull(rc, b)
+       if err == io.EOF {
+               err = io.ErrUnexpectedEOF
+       }
+       return
+}
+
+func (me *store) removePath(path string) (err error) {
+       me.requestPool <- struct{}{}
+       defer func() {
+               <-me.requestPool
+       }()
+       err = me.db.Delete(path)
+       return
+}
+
+// Remove the completed piece if it exists, and mark the piece not completed.
+// Mustn't fail.
+func (me *store) deleteCompleted(p metainfo.Piece) {
+       if err := me.removePath(me.completedPiecePath(p)); err != nil {
+               panic(err)
+       }
+       me.setCompletion(p, false)
+}
+
+func (me *store) hashCopyFile(from, to string, n int64) (hash []byte, err error) {
+       // Yes, 2 requests occur here simultaneously, but we're not trying to be
+       // pedantic.
+       me.requestPool <- struct{}{}
+       defer func() {
+               <-me.requestPool
+       }()
+       src, err := me.db.OpenSection(from, 0, n)
+       if err != nil {
+               return
+       }
+       defer src.Close()
+       hasher := sha1.New()
+       tee := io.TeeReader(src, hasher)
+       dest, err := me.db.Open(to)
+       if err != nil {
+               return
+       }
+       defer dest.Close()
+       _, err = io.Copy(dest, tee)
+       if err != nil {
+               return
+       }
+       hash = hasher.Sum(nil)
+       return
+}
+
+func (me *store) PieceCompleted(p metainfo.Piece) (err error) {
+       hash, err := me.hashCopyFile(me.incompletePiecePath(p), me.completedPiecePath(p), p.Length())
+       if err == nil && !bytes.Equal(hash, p.Hash()) {
+               err = errors.New("piece incomplete")
+       }
+       if err != nil {
+               me.deleteCompleted(p)
+               return
+       }
+       me.removePath(me.incompletePiecePath(p))
+       me.setCompletion(p, true)
+       return
+}
index cb130007c774ef8dd454c0cdec6e14a917097dd2..e1faf0ae3217d6e870c6ed603280726f4d265fbb 100644 (file)
@@ -662,7 +662,17 @@ func (t *torrent) hashPiece(piece pp.Integer) (ps pieceSum) {
                p.noPendingWrites.Wait()
        }
        p.pendingWritesMutex.Unlock()
-       t.data.WriteSectionTo(hash, int64(piece)*t.Info.PieceLength, t.Info.PieceLength)
+       pl := t.Info.Piece(int(piece)).Length()
+       n, err := t.data.WriteSectionTo(hash, int64(piece)*t.Info.PieceLength, pl)
+       if err != nil {
+               if err != io.ErrUnexpectedEOF {
+                       log.Printf("error hashing piece with %T: %s", t.data, err)
+               }
+               return
+       }
+       if n != pl {
+               panic("lame")
+       }
        missinggo.CopyExact(ps[:], hash.Sum(nil))
        return
 }