client.go | 33 +++++++++++++++++++++++++++------ client_test.go | 2 +- data/blob/blob.go | 52 ++++++++++++++++++++++++++++++++++++++++++++++++++++ fs/torrentfs_test.go | 2 +- misc.go | 3 ++- torrent.go | 14 +++++++++++++- diff --git a/client.go b/client.go index 196f67a5122fb11ebd128520c6f97c7b61152b41..b96f67d0350490625a748918926c2e079d07c170 100644 --- a/client.go +++ b/client.go @@ -99,7 +99,7 @@ // Queue a piece check if one isn't already queued, and the piece has never // been checked before. func (cl *Client) queueFirstHash(t *torrent, piece int) { p := t.Pieces[piece] - if p.EverHashed || p.Hashing || p.QueuedForHash { + if p.EverHashed || p.Hashing || p.QueuedForHash || p.Complete() { return } cl.queuePieceCheck(t, pp.Integer(piece)) @@ -409,7 +409,7 @@ disableUTP: cfg.DisableUTP, disableTCP: cfg.DisableTCP, _configDir: cfg.ConfigDir, config: *cfg, - torrentDataOpener: func(md *metainfo.Info) (Data, error) { + torrentDataOpener: func(md *metainfo.Info) (StatelessData, error) { return filePkg.TorrentData(md, cfg.DataDir), nil }, @@ -1553,7 +1553,16 @@ cl.startTorrent(t) return } -type TorrentDataOpener func(*metainfo.Info) (Data, error) +type TorrentDataOpener func(*metainfo.Info) (StatelessData, error) + +type statelessDataWrapper struct { + StatelessData +} + +func (statelessDataWrapper) PieceComplete(int) bool { return false } +func (statelessDataWrapper) PieceCompleted(int) error { return nil } + +var _ Data = statelessDataWrapper{} func (cl *Client) setMetaData(t *torrent, md metainfo.Info, bytes []byte) (err error) { err = t.setMetadata(md, bytes, &cl.mu) @@ -1571,9 +1580,13 @@ err = errors.New("no porn plx") return } close(t.gotMetainfo) - td, err := cl.torrentDataOpener(&md) + stateless, err := cl.torrentDataOpener(&md) if err != nil { return + } + td, ok := stateless.(Data) + if !ok { + td = statelessDataWrapper{stateless} } err = cl.setStorage(t, td) return @@ -2206,8 +2219,16 @@ failedPieceHashes.Add(1) } p.EverHashed = true if correct { + err := t.data.PieceCompleted(int(piece)) + if err != nil { + log.Printf("error completing piece: %s", err) + correct = false + } + } + if correct { p.Priority = piecePriorityNone p.PendingChunkSpecs = nil + p.complete = true p.Event.Broadcast() me.downloadStrategy.TorrentGotPiece(t, int(piece)) } else { @@ -2250,11 +2271,11 @@ p := t.Pieces[index] for p.Hashing || t.data == nil { cl.event.Wait() } - if t.isClosed() { + p.QueuedForHash = false + if t.isClosed() || p.complete { return } p.Hashing = true - p.QueuedForHash = false cl.mu.Unlock() sum := t.hashPiece(index) cl.mu.Lock() diff --git a/client_test.go b/client_test.go index d873fbe1a7e37e4a68b8a286136f0b187626b735..5dc8a4d3292b2b9cfb0200cbef4b434c363b3993 100644 --- a/client_test.go +++ b/client_test.go @@ -254,7 +254,7 @@ if err != nil { t.Fatal(err) } defer os.RemoveAll(leecherDataDir) - cfg.TorrentDataOpener = func(info *metainfo.Info) (Data, error) { + cfg.TorrentDataOpener = func(info *metainfo.Info) (StatelessData, error) { return blob.TorrentData(info, leecherDataDir), nil } leecher, _ := NewClient(&cfg) diff --git a/data/blob/blob.go b/data/blob/blob.go index 43657f2ada9fffca18edbc30d4148422a36fe6e1..f8ece667a8a17c71cf57c85b42e17048e787075b 100644 --- a/data/blob/blob.go +++ b/data/blob/blob.go @@ -1,12 +1,20 @@ package blob import ( + "bytes" + "crypto/sha1" "encoding/hex" "errors" "io" "os" + "path/filepath" "github.com/anacrolix/libtorgo/metainfo" +) + +const ( + filePerm = 0640 + dirPerm = 0750 ) type data struct { @@ -125,3 +133,47 @@ off = 0 } return } + +func (me *data) incompletePiecePath(piece int) string { + return filepath.Join(me.baseDir, "incomplete", me.pieceHashHex(piece)) +} + +func (me *data) completedPiecePath(piece int) string { + return filepath.Join(me.baseDir, "complete", me.pieceHashHex(piece)) +} + +func (me *data) PieceCompleted(index int) (err error) { + var ( + incompletePiecePath = me.incompletePiecePath(index) + completedPiecePath = me.completedPiecePath(index) + ) + fSrc, err := os.Open(incompletePiecePath) + if err != nil { + return + } + defer fSrc.Close() + os.MkdirAll(filepath.Dir(completedPiecePath), dirPerm) + fDst, err := os.OpenFile(completedPiecePath, os.O_EXCL|os.O_CREATE|os.O_WRONLY, filePerm) + if err != nil { + return + } + defer fDst.Close() + hasher := sha1.New() + r := io.TeeReader(io.LimitReader(fSrc, me.info.Piece(index).Length()), hasher) + _, err = io.Copy(fDst, r) + if err != nil { + return + } + if !bytes.Equal(hasher.Sum(nil), me.info.Piece(index).Hash()) { + err = errors.New("piece incomplete") + os.Remove(completedPiecePath) + return + } + os.Remove(incompletePiecePath) + return +} + +func (me *data) PieceComplete(piece int) bool { + _, err := os.Stat(me.completedPiecePath(piece)) + return err == nil +} diff --git a/fs/torrentfs_test.go b/fs/torrentfs_test.go index 69df66b02bb3db0891696844c2a138980a9fa9ca..4c865ba4e483cc7759fe68ebb6e5f42e49f2c8a1 100644 --- a/fs/torrentfs_test.go +++ b/fs/torrentfs_test.go @@ -191,7 +191,7 @@ DisableTCP: true, NoDefaultBlocklist: true, - TorrentDataOpener: func(info *metainfo.Info) (torrent.Data, error) { + TorrentDataOpener: func(info *metainfo.Info) (torrent.StatelessData, error) { return mmap.TorrentData(info, filepath.Join(layout.BaseDir, "download")) }, diff --git a/misc.go b/misc.go index f14c8f74d9d752b1c605bcb91954b7ac5801d17c..c3da1048680fa215a7550ad7a94d856c349f0e03 100644 --- a/misc.go +++ b/misc.go @@ -45,6 +45,7 @@ ) type piece struct { Hash pieceSum + complete bool PendingChunkSpecs map[chunkSpec]struct{} Hashing bool QueuedForHash bool @@ -72,7 +73,7 @@ return } func (p *piece) Complete() bool { - return len(p.PendingChunkSpecs) == 0 && p.EverHashed + return p.complete } func lastChunkSpec(pieceLength peer_protocol.Integer) (cs chunkSpec) { diff --git a/torrent.go b/torrent.go index 3804e0675f7c2121f15e1e118dba04bbe783d7f3..5217db5646e275839d6b1be5d390dcf6e403341f 100644 --- a/torrent.go +++ b/torrent.go @@ -38,11 +38,19 @@ IPBytes string Port int } -type Data interface { +type StatelessData interface { ReadAt(p []byte, off int64) (n int, err error) Close() WriteAt(p []byte, off int64) (n int, err error) WriteSectionTo(w io.Writer, off, n int64) (written int64, err error) +} + +type Data interface { + StatelessData + // We believe the piece data will pass a hash check. + PieceCompleted(index int) error + // Returns true if the piece is complete. + PieceComplete(index int) bool } // Is not aware of Client. Maintains state of torrent for with-in a Client. @@ -211,6 +219,9 @@ if t.data != nil { t.data.Close() } t.data = td + for i, p := range t.Pieces { + p.complete = t.data.PieceComplete(i) + } return } @@ -586,6 +597,7 @@ t.data.WriteSectionTo(hash, int64(piece)*t.Info.PieceLength, t.Info.PieceLength) util.CopyExact(ps[:], hash.Sum(nil)) return } + func (t *torrent) haveAllPieces() bool { if !t.haveInfo() { return false