From: Matt Joiner Date: Sat, 3 Oct 2015 14:22:46 +0000 (+1000) Subject: Create a pieceStore interface, and merge in my httpfile backend, and replace data... X-Git-Tag: v1.0.0~1023 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=b0b57948902f85cb3445c28c8a70c24534ad2abc;p=btrtrc.git Create a pieceStore interface, and merge in my httpfile backend, and replace data/blob data/blob was aging, and had severe performance problems. It's now possible to use missinggo/filecache as a data backend to pieceStore which is better tested and performs excellently. --- diff --git a/client_test.go b/client_test.go index a0f0f2a1..d8707a9f 100644 --- a/client_test.go +++ b/client_test.go @@ -16,13 +16,15 @@ import ( _ "github.com/anacrolix/envpprof" "github.com/anacrolix/missinggo" . "github.com/anacrolix/missinggo" + "github.com/anacrolix/missinggo/filecache" "github.com/anacrolix/utp" "github.com/bradfitz/iter" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/anacrolix/torrent/bencode" - "github.com/anacrolix/torrent/data/blob" + "github.com/anacrolix/torrent/data/pieceStore" + "github.com/anacrolix/torrent/data/pieceStore/dataBackend/fileCache" "github.com/anacrolix/torrent/dht" "github.com/anacrolix/torrent/internal/testutil" "github.com/anacrolix/torrent/iplist" @@ -30,7 +32,7 @@ import ( ) func init() { - log.SetFlags(log.LstdFlags | log.Lshortfile) + log.SetFlags(log.LstdFlags | log.Llongfile) } var TestingConfig = Config{ @@ -266,10 +268,18 @@ func TestClientTransfer(t *testing.T) { // cfg.TorrentDataOpener = func(info *metainfo.Info) (data.Data, error) { // return blob.TorrentData(info, leecherDataDir), nil // } - blobStore := blob.NewStore(leecherDataDir) - cfg.TorrentDataOpener = func(info *metainfo.Info) Data { - return blobStore.OpenTorrent(info) - } + // blobStore := blob.NewStore(leecherDataDir) + // cfg.TorrentDataOpener = func(info *metainfo.Info) Data { + // return blobStore.OpenTorrent(info) + // } + cfg.TorrentDataOpener = func() TorrentDataOpener { + fc, err := filecache.NewCache(leecherDataDir) + require.NoError(t, err) + store := pieceStore.New(fileCacheDataBackend.New(fc)) + return func(mi *metainfo.Info) Data { + return store.OpenTorrentData(mi) + } + }() leecher, _ := NewClient(&cfg) defer leecher.Close() leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) { diff --git a/data.go b/data.go index 803d8476..9f8214fe 100644 --- a/data.go +++ b/data.go @@ -4,9 +4,11 @@ import "io" // Represents data storage for a Torrent. type Data interface { - ReadAt(p []byte, off int64) (n int, err error) + io.ReaderAt + io.WriterAt + // Bro, do you even io.Closer? Close() - WriteAt(p []byte, off int64) (n int, err error) + // If the data isn't available, err should be io.ErrUnexpectedEOF. WriteSectionTo(w io.Writer, off, n int64) (written int64, err error) // We believe the piece data will pass a hash check. PieceCompleted(index int) error diff --git a/data/blob/store.go b/data/blob/store.go deleted file mode 100644 index 99f696ab..00000000 --- a/data/blob/store.go +++ /dev/null @@ -1,277 +0,0 @@ -// Implements torrent data storage as per-piece files. -package blob - -import ( - "bytes" - "crypto/sha1" - "encoding/hex" - "errors" - "fmt" - "io" - "os" - "path/filepath" - "sort" - "sync" - "time" - - "github.com/anacrolix/missinggo" - - "github.com/anacrolix/torrent/metainfo" -) - -const ( - filePerm = 0640 - dirPerm = 0750 -) - -type store struct { - baseDir string - capacity int64 - - mu sync.Mutex - completed map[[20]byte]struct{} -} - -func (me *store) OpenTorrent(info *metainfo.Info) *data { - return &data{info, me} -} - -type StoreOption func(*store) - -func Capacity(bytes int64) StoreOption { - return func(s *store) { - s.capacity = bytes - } -} - -func NewStore(baseDir string, opt ...StoreOption) *store { - s := &store{baseDir, -1, sync.Mutex{}, nil} - for _, o := range opt { - o(s) - } - s.initCompleted() - return s -} - -// Turns 40 byte hex string into its equivalent binary byte array. -func hexStringPieceHashArray(s string) (ret [20]byte, ok bool) { - if len(s) != 40 { - return - } - n, err := hex.Decode(ret[:], []byte(s)) - if err != nil { - return - } - if n != 20 { - panic(n) - } - ok = true - return -} - -func (me *store) initCompleted() { - fis, err := me.readCompletedDir() - if err != nil { - panic(err) - } - me.mu.Lock() - me.completed = make(map[[20]byte]struct{}, len(fis)) - for _, fi := range fis { - binHash, ok := hexStringPieceHashArray(fi.Name()) - if !ok { - continue - } - me.completed[binHash] = struct{}{} - } - me.mu.Unlock() -} - -func (me *store) completePieceDirPath() string { - return filepath.Join(me.baseDir, "completed") -} - -func (me *store) path(p metainfo.Piece, completed bool) string { - return filepath.Join(me.baseDir, func() string { - if completed { - return "completed" - } else { - return "incomplete" - } - }(), fmt.Sprintf("%x", p.Hash())) -} - -func sliceToPieceHashArray(b []byte) (ret [20]byte) { - n := copy(ret[:], b) - if n != 20 { - panic(n) - } - return -} - -func (me *store) pieceComplete(p metainfo.Piece) bool { - me.mu.Lock() - defer me.mu.Unlock() - _, ok := me.completed[sliceToPieceHashArray(p.Hash())] - return ok -} - -func (me *store) pieceWrite(p metainfo.Piece) (f *os.File) { - if me.pieceComplete(p) { - return - } - name := me.path(p, false) - os.MkdirAll(filepath.Dir(name), dirPerm) - f, err := os.OpenFile(name, os.O_CREATE|os.O_WRONLY, filePerm) - if err != nil { - panic(err) - } - return -} - -// Returns the file for the given piece, if it exists. It could be completed, -// or incomplete. -func (me *store) pieceRead(p metainfo.Piece) (f *os.File) { - f, err := os.Open(me.path(p, true)) - if err == nil { - return - } - if !os.IsNotExist(err) { - panic(err) - } - // Mark the file not completed, in case we thought it was. TODO: Trigger - // an asynchronous initCompleted to reinitialize the entire completed map - // as there are likely other files missing. - me.mu.Lock() - delete(me.completed, sliceToPieceHashArray(p.Hash())) - me.mu.Unlock() - f, err = os.Open(me.path(p, false)) - if err == nil { - return - } - if !os.IsNotExist(err) { - panic(err) - } - return -} - -func (me *store) readCompletedDir() (fis []os.FileInfo, err error) { - f, err := os.Open(me.completePieceDirPath()) - if err != nil { - if os.IsNotExist(err) { - err = nil - } - return - } - fis, err = f.Readdir(-1) - f.Close() - return -} - -func (me *store) removeCompleted(name string) (err error) { - err = os.Remove(filepath.Join(me.completePieceDirPath(), name)) - if os.IsNotExist(err) { - err = nil - } - if err != nil { - return err - } - binHash, ok := hexStringPieceHashArray(name) - if ok { - me.mu.Lock() - delete(me.completed, binHash) - me.mu.Unlock() - } - return -} - -type fileInfoSorter struct { - fis []os.FileInfo -} - -func (me fileInfoSorter) Len() int { - return len(me.fis) -} - -func lastTime(fi os.FileInfo) (ret time.Time) { - ret = fi.ModTime() - atime := missinggo.FileInfoAccessTime(fi) - if atime.After(ret) { - ret = atime - } - return -} - -func (me fileInfoSorter) Less(i, j int) bool { - return lastTime(me.fis[i]).Before(lastTime(me.fis[j])) -} - -func (me fileInfoSorter) Swap(i, j int) { - me.fis[i], me.fis[j] = me.fis[j], me.fis[i] -} - -func sortFileInfos(fis []os.FileInfo) { - sorter := fileInfoSorter{fis} - sort.Sort(sorter) -} - -func (me *store) makeSpace(space int64) error { - if me.capacity < 0 { - return nil - } - if space > me.capacity { - return errors.New("space requested exceeds capacity") - } - fis, err := me.readCompletedDir() - if err != nil { - return err - } - var size int64 - for _, fi := range fis { - size += fi.Size() - } - sortFileInfos(fis) - for size > me.capacity-space { - me.removeCompleted(fis[0].Name()) - size -= fis[0].Size() - fis = fis[1:] - } - return nil -} - -func (me *store) PieceCompleted(p metainfo.Piece) (err error) { - err = me.makeSpace(p.Length()) - if err != nil { - return - } - var ( - incompletePiecePath = me.path(p, false) - completedPiecePath = me.path(p, true) - ) - fSrc, err := os.Open(incompletePiecePath) - if err != nil { - return - } - defer fSrc.Close() - os.MkdirAll(filepath.Dir(completedPiecePath), dirPerm) - fDst, err := os.OpenFile(completedPiecePath, os.O_EXCL|os.O_CREATE|os.O_WRONLY, filePerm) - if err != nil { - return - } - defer fDst.Close() - hasher := sha1.New() - r := io.TeeReader(io.LimitReader(fSrc, p.Length()), hasher) - _, err = io.Copy(fDst, r) - if err != nil { - return - } - if !bytes.Equal(hasher.Sum(nil), p.Hash()) { - err = errors.New("piece incomplete") - os.Remove(completedPiecePath) - return - } - os.Remove(incompletePiecePath) - me.mu.Lock() - me.completed[sliceToPieceHashArray(p.Hash())] = struct{}{} - me.mu.Unlock() - return -} diff --git a/data/file/file.go b/data/file/file.go index bf77db2a..a24f4ebe 100644 --- a/data/file/file.go +++ b/data/file/file.go @@ -103,6 +103,9 @@ func (me data) WriteSectionTo(w io.Writer, off, n int64) (written int64, err err } var f *os.File f, err = os.Open(me.fileInfoName(fi)) + if os.IsNotExist(err) { + err = io.ErrUnexpectedEOF + } if err != nil { return } diff --git a/data/blob/blob.go b/data/pieceStore/blob.go similarity index 70% rename from data/blob/blob.go rename to data/pieceStore/blob.go index 0afc047b..0ebdea25 100644 --- a/data/blob/blob.go +++ b/data/pieceStore/blob.go @@ -1,9 +1,8 @@ -package blob +package pieceStore import ( "encoding/hex" "io" - "log" "github.com/anacrolix/torrent/metainfo" ) @@ -19,6 +18,10 @@ func (me *data) pieceHashHex(i int) string { func (me *data) Close() {} +// TODO: Make sure that reading completed can't read from incomplete. Then +// also it'll be possible to verify that the Content-Range on completed +// returns the correct piece length so there aren't short reads. + func (me *data) ReadAt(b []byte, off int64) (n int, err error) { for len(b) != 0 { if off >= me.info.TotalLength() { @@ -26,27 +29,16 @@ func (me *data) ReadAt(b []byte, off int64) (n int, err error) { break } p := me.info.Piece(int(off / me.info.PieceLength)) - f := me.store.pieceRead(p) - if f == nil { - log.Println("piece not found", p) - err = io.ErrUnexpectedEOF - break - } b1 := b maxN1 := int(p.Length() - off%me.info.PieceLength) if len(b1) > maxN1 { b1 = b1[:maxN1] } var n1 int - n1, err = f.ReadAt(b1, off%me.info.PieceLength) - f.Close() + n1, err = me.store.pieceReadAt(p, b1, off%me.info.PieceLength) n += n1 off += int64(n1) b = b[n1:] - if err == io.EOF { - err = nil - break - } if err != nil { break } @@ -54,19 +46,18 @@ func (me *data) ReadAt(b []byte, off int64) (n int, err error) { return } +// TODO: Rewrite this later, on short writes to a piece it will start to play up. func (me *data) WriteAt(p []byte, off int64) (n int, err error) { i := int(off / me.info.PieceLength) off %= me.info.PieceLength for len(p) != 0 { - f := me.store.pieceWrite(me.info.Piece(i)) p1 := p maxN := me.info.Piece(i).Length() - off if int64(len(p1)) > maxN { p1 = p1[:maxN] } var n1 int - n1, err = f.WriteAt(p1, off) - f.Close() + n1, err = me.store.pieceWriteAt(me.info.Piece(i), p1, off) n += n1 if err != nil { return @@ -78,27 +69,23 @@ func (me *data) WriteAt(p []byte, off int64) (n int, err error) { return } -func (me *data) pieceReader(piece int, off int64) (ret io.ReadCloser, err error) { - f := me.store.pieceRead(me.info.Piece(piece)) - if f == nil { - err = io.ErrUnexpectedEOF - return - } - return struct { - io.Reader - io.Closer - }{ - Reader: io.NewSectionReader(f, off, me.info.Piece(piece).Length()-off), - Closer: f, - }, nil +func (me *data) pieceReader(p metainfo.Piece, off int64) (ret io.ReadCloser, err error) { + return me.store.getPieceRange(p, off, p.Length()-off) } func (me *data) WriteSectionTo(w io.Writer, off, n int64) (written int64, err error) { i := int(off / me.info.PieceLength) off %= me.info.PieceLength for n != 0 { + if i >= me.info.NumPieces() { + break + } + p := me.info.Piece(i) + if off >= p.Length() { + break + } var pr io.ReadCloser - pr, err = me.pieceReader(i, off) + pr, err = me.pieceReader(p, off) if err != nil { return } diff --git a/data/pieceStore/dataBackend/fileCache/backend.go b/data/pieceStore/dataBackend/fileCache/backend.go new file mode 100644 index 00000000..701ae4aa --- /dev/null +++ b/data/pieceStore/dataBackend/fileCache/backend.go @@ -0,0 +1,63 @@ +package fileCacheDataBackend + +import ( + "io" + "os" + + "github.com/anacrolix/missinggo/filecache" + + "github.com/anacrolix/torrent/data/pieceStore/dataBackend" +) + +type backend struct { + c *filecache.Cache +} + +func New(fc *filecache.Cache) *backend { + return &backend{ + c: fc, + } +} + +var _ dataBackend.I = &backend{} + +func (me *backend) Delete(path string) (err error) { + err = me.c.Remove(path) + return +} + +func (me *backend) GetLength(path string) (ret int64, err error) { + f, err := me.c.OpenFile(path, 0) + if os.IsNotExist(err) { + err = dataBackend.ErrNotFound + } + if err != nil { + return + } + defer f.Close() + ret, err = f.Seek(0, os.SEEK_END) + return +} + +func (me *backend) Open(path string) (ret dataBackend.File, err error) { + ret, err = me.c.OpenFile(path, os.O_RDWR|os.O_CREATE) + return +} + +func (me *backend) OpenSection(path string, off, n int64) (ret io.ReadCloser, err error) { + f, err := me.c.OpenFile(path, os.O_RDONLY) + if os.IsNotExist(err) { + err = dataBackend.ErrNotFound + } + if err != nil { + return + } + ret = struct { + io.Reader + io.Closer + }{ + io.NewSectionReader(f, off, n), + f, + } + return +} diff --git a/data/pieceStore/dataBackend/http/backend.go b/data/pieceStore/dataBackend/http/backend.go new file mode 100644 index 00000000..85fc7840 --- /dev/null +++ b/data/pieceStore/dataBackend/http/backend.go @@ -0,0 +1,65 @@ +package httpDataBackend + +import ( + "io" + "net/http" + "net/url" + "path" + + "github.com/anacrolix/missinggo" + "github.com/anacrolix/missinggo/httpfile" + + "github.com/anacrolix/torrent/data/pieceStore/dataBackend" +) + +var client = http.DefaultClient + +type backend struct { + // Backend URL. + url url.URL +} + +func New(u url.URL) *backend { + return &backend{ + url: *missinggo.CopyURL(&u), + } +} + +var _ dataBackend.I = &backend{} + +func fixErrNotFound(err error) error { + if err == httpfile.ErrNotFound { + return dataBackend.ErrNotFound + } + return err +} + +func (me *backend) urlStr(_path string) string { + u := me.url + u.Path = path.Join(u.Path, _path) + return u.String() +} + +func (me *backend) Delete(path string) (err error) { + err = httpfile.Delete(me.urlStr(path)) + err = fixErrNotFound(err) + return +} + +func (me *backend) GetLength(path string) (ret int64, err error) { + ret, err = httpfile.GetLength(me.urlStr(path)) + err = fixErrNotFound(err) + return +} + +func (me *backend) Open(path string) (ret dataBackend.File, err error) { + ret = httpfile.Open(me.urlStr(path)) + err = fixErrNotFound(err) + return +} + +func (me *backend) OpenSection(path string, off, n int64) (ret io.ReadCloser, err error) { + ret, err = httpfile.OpenSectionReader(me.urlStr(path), off, n) + err = fixErrNotFound(err) + return +} diff --git a/data/pieceStore/dataBackend/i.go b/data/pieceStore/dataBackend/i.go new file mode 100644 index 00000000..6b5beb9c --- /dev/null +++ b/data/pieceStore/dataBackend/i.go @@ -0,0 +1,22 @@ +package dataBackend + +import ( + "errors" + "io" +) + +// All functions must return ErrNotFound as required. +type I interface { + GetLength(path string) (int64, error) + Open(path string) (File, error) + OpenSection(path string, off, n int64) (io.ReadCloser, error) + Delete(path string) error +} + +var ErrNotFound = errors.New("not found") + +type File interface { + io.Closer + io.Seeker + io.Writer +} diff --git a/data/pieceStore/store.go b/data/pieceStore/store.go new file mode 100644 index 00000000..3b1970fe --- /dev/null +++ b/data/pieceStore/store.go @@ -0,0 +1,292 @@ +package pieceStore + +import ( + "bytes" + "crypto/sha1" + "encoding/hex" + "errors" + "io" + "log" + "os" + "path" + "strconv" + "sync" + "time" + + "github.com/anacrolix/torrent/data/pieceStore/dataBackend" + "github.com/anacrolix/torrent/metainfo" +) + +type store struct { + db dataBackend.I + // Limit backend requests. + requestPool chan struct{} + + mu sync.Mutex + // The cached completion state for pieces. + completion map[[20]byte]bool + lastError time.Time +} + +func (me *store) completedPiecePath(p metainfo.Piece) string { + return path.Join("completed", hex.EncodeToString(p.Hash())) +} + +func (me *store) incompletePiecePath(p metainfo.Piece) string { + return path.Join( + "incomplete", + strconv.FormatInt(int64(os.Getpid()), 10), + hex.EncodeToString(p.Hash())) +} + +func (me *store) OpenTorrentData(info *metainfo.Info) *data { + return &data{info, me} +} + +func New(db dataBackend.I) *store { + s := &store{ + db: db, + requestPool: make(chan struct{}, 5), + } + return s +} + +// Turns 40 byte hex string into its equivalent binary byte array. +func hexStringPieceHashArray(s string) (ret [20]byte, ok bool) { + if len(s) != 40 { + return + } + n, err := hex.Decode(ret[:], []byte(s)) + if err != nil { + return + } + if n != 20 { + panic(n) + } + ok = true + return +} + +func sliceToPieceHashArray(b []byte) (ret [20]byte) { + n := copy(ret[:], b) + if n != 20 { + panic(n) + } + return +} + +func pieceHashArray(p metainfo.Piece) [20]byte { + return sliceToPieceHashArray(p.Hash()) +} + +func (me *store) completionKnown(p metainfo.Piece) bool { + me.mu.Lock() + _, ok := me.completion[pieceHashArray(p)] + me.mu.Unlock() + return ok +} + +func (me *store) isComplete(p metainfo.Piece) bool { + me.mu.Lock() + ret, _ := me.completion[pieceHashArray(p)] + me.mu.Unlock() + return ret +} + +func (me *store) setCompletion(p metainfo.Piece, complete bool) { + me.mu.Lock() + if me.completion == nil { + me.completion = make(map[[20]byte]bool) + } + me.completion[pieceHashArray(p)] = complete + me.mu.Unlock() +} + +func (me *store) pieceComplete(p metainfo.Piece) bool { + if me.completionKnown(p) { + return me.isComplete(p) + } + // Prevent a errors from stalling the caller. + if !me.lastError.IsZero() && time.Since(me.lastError) < time.Second { + return false + } + me.requestPool <- struct{}{} + defer func() { + <-me.requestPool + }() + length, err := me.db.GetLength(me.completedPiecePath(p)) + if err == dataBackend.ErrNotFound { + me.setCompletion(p, false) + return false + } + if err != nil { + me.lastError = time.Now() + log.Printf("%+v", err) + return false + } + complete := length == p.Length() + if !complete { + log.Printf("completed piece %x has wrong length: %d", p.Hash(), length) + } + me.setCompletion(p, complete) + return complete +} + +func (me *store) pieceWriteAt(p metainfo.Piece, b []byte, off int64) (n int, err error) { + if me.pieceComplete(p) { + err = errors.New("already have piece") + return + } + me.requestPool <- struct{}{} + defer func() { + <-me.requestPool + }() + f, err := me.db.Open(me.incompletePiecePath(p)) + if err != nil { + return + } + defer func() { + closeErr := f.Close() + if err == nil { + err = closeErr + } + }() + _, err = f.Seek(off, os.SEEK_SET) + if err != nil { + return + } + n, err = f.Write(b) + return +} + +// Wraps a Closer, releases a slot from a channel pool the first time Close is +// called. +type poolCloser struct { + mu sync.Mutex + released bool + pool <-chan struct{} + io.Closer +} + +func (me *poolCloser) Close() (err error) { + err = me.Closer.Close() + me.mu.Lock() + if !me.released { + <-me.pool + me.released = true + } + me.mu.Unlock() + return +} + +func (me *store) forgetCompletions() { + me.mu.Lock() + me.completion = nil + me.mu.Unlock() +} + +func (me *store) getPieceRange(p metainfo.Piece, off, n int64) (ret io.ReadCloser, err error) { + me.requestPool <- struct{}{} + rc, err := me.db.OpenSection(me.completedPiecePath(p), off, n) + if err == dataBackend.ErrNotFound { + if me.isComplete(p) { + me.forgetCompletions() + } + me.setCompletion(p, false) + rc, err = me.db.OpenSection(me.incompletePiecePath(p), off, n) + } + if err == dataBackend.ErrNotFound { + <-me.requestPool + err = io.ErrUnexpectedEOF + return + } + if err != nil { + <-me.requestPool + return + } + // Wrap up the response body so that the request slot is released when the + // response body is closed. + ret = struct { + io.Reader + io.Closer + }{ + rc, + &poolCloser{ + pool: me.requestPool, + Closer: rc, + }, + } + return +} + +func (me *store) pieceReadAt(p metainfo.Piece, b []byte, off int64) (n int, err error) { + rc, err := me.getPieceRange(p, off, int64(len(b))) + if err != nil { + return + } + defer rc.Close() + n, err = io.ReadFull(rc, b) + if err == io.EOF { + err = io.ErrUnexpectedEOF + } + return +} + +func (me *store) removePath(path string) (err error) { + me.requestPool <- struct{}{} + defer func() { + <-me.requestPool + }() + err = me.db.Delete(path) + return +} + +// Remove the completed piece if it exists, and mark the piece not completed. +// Mustn't fail. +func (me *store) deleteCompleted(p metainfo.Piece) { + if err := me.removePath(me.completedPiecePath(p)); err != nil { + panic(err) + } + me.setCompletion(p, false) +} + +func (me *store) hashCopyFile(from, to string, n int64) (hash []byte, err error) { + // Yes, 2 requests occur here simultaneously, but we're not trying to be + // pedantic. + me.requestPool <- struct{}{} + defer func() { + <-me.requestPool + }() + src, err := me.db.OpenSection(from, 0, n) + if err != nil { + return + } + defer src.Close() + hasher := sha1.New() + tee := io.TeeReader(src, hasher) + dest, err := me.db.Open(to) + if err != nil { + return + } + defer dest.Close() + _, err = io.Copy(dest, tee) + if err != nil { + return + } + hash = hasher.Sum(nil) + return +} + +func (me *store) PieceCompleted(p metainfo.Piece) (err error) { + hash, err := me.hashCopyFile(me.incompletePiecePath(p), me.completedPiecePath(p), p.Length()) + if err == nil && !bytes.Equal(hash, p.Hash()) { + err = errors.New("piece incomplete") + } + if err != nil { + me.deleteCompleted(p) + return + } + me.removePath(me.incompletePiecePath(p)) + me.setCompletion(p, true) + return +} diff --git a/torrent.go b/torrent.go index cb130007..e1faf0ae 100644 --- a/torrent.go +++ b/torrent.go @@ -662,7 +662,17 @@ func (t *torrent) hashPiece(piece pp.Integer) (ps pieceSum) { p.noPendingWrites.Wait() } p.pendingWritesMutex.Unlock() - t.data.WriteSectionTo(hash, int64(piece)*t.Info.PieceLength, t.Info.PieceLength) + pl := t.Info.Piece(int(piece)).Length() + n, err := t.data.WriteSectionTo(hash, int64(piece)*t.Info.PieceLength, pl) + if err != nil { + if err != io.ErrUnexpectedEOF { + log.Printf("error hashing piece with %T: %s", t.data, err) + } + return + } + if n != pl { + panic("lame") + } missinggo.CopyExact(ps[:], hash.Sum(nil)) return }