// been checked before.
func (cl *Client) queueFirstHash(t *torrent, piece int) {
p := t.Pieces[piece]
- if p.EverHashed || p.Hashing || p.QueuedForHash {
+ if p.EverHashed || p.Hashing || p.QueuedForHash || p.Complete() {
return
}
cl.queuePieceCheck(t, pp.Integer(piece))
disableTCP: cfg.DisableTCP,
_configDir: cfg.ConfigDir,
config: *cfg,
- torrentDataOpener: func(md *metainfo.Info) (Data, error) {
+ torrentDataOpener: func(md *metainfo.Info) (StatelessData, error) {
return filePkg.TorrentData(md, cfg.DataDir), nil
},
return
}
-type TorrentDataOpener func(*metainfo.Info) (Data, error)
+type TorrentDataOpener func(*metainfo.Info) (StatelessData, error)
+
+type statelessDataWrapper struct {
+ StatelessData
+}
+
+func (statelessDataWrapper) PieceComplete(int) bool { return false }
+func (statelessDataWrapper) PieceCompleted(int) error { return nil }
+
+var _ Data = statelessDataWrapper{}
func (cl *Client) setMetaData(t *torrent, md metainfo.Info, bytes []byte) (err error) {
err = t.setMetadata(md, bytes, &cl.mu)
return
}
close(t.gotMetainfo)
- td, err := cl.torrentDataOpener(&md)
+ stateless, err := cl.torrentDataOpener(&md)
if err != nil {
return
}
+ td, ok := stateless.(Data)
+ if !ok {
+ td = statelessDataWrapper{stateless}
+ }
err = cl.setStorage(t, td)
return
}
failedPieceHashes.Add(1)
}
p.EverHashed = true
+ if correct {
+ err := t.data.PieceCompleted(int(piece))
+ if err != nil {
+ log.Printf("error completing piece: %s", err)
+ correct = false
+ }
+ }
if correct {
p.Priority = piecePriorityNone
p.PendingChunkSpecs = nil
+ p.complete = true
p.Event.Broadcast()
me.downloadStrategy.TorrentGotPiece(t, int(piece))
} else {
for p.Hashing || t.data == nil {
cl.event.Wait()
}
- if t.isClosed() {
+ p.QueuedForHash = false
+ if t.isClosed() || p.complete {
return
}
p.Hashing = true
- p.QueuedForHash = false
cl.mu.Unlock()
sum := t.hashPiece(index)
cl.mu.Lock()
t.Fatal(err)
}
defer os.RemoveAll(leecherDataDir)
- cfg.TorrentDataOpener = func(info *metainfo.Info) (Data, error) {
+ cfg.TorrentDataOpener = func(info *metainfo.Info) (StatelessData, error) {
return blob.TorrentData(info, leecherDataDir), nil
}
leecher, _ := NewClient(&cfg)
package blob
import (
+ "bytes"
+ "crypto/sha1"
"encoding/hex"
"errors"
"io"
"os"
+ "path/filepath"
"github.com/anacrolix/libtorgo/metainfo"
)
+const (
+ filePerm = 0640
+ dirPerm = 0750
+)
+
type data struct {
info *metainfo.Info
baseDir string
}
return
}
+
+func (me *data) incompletePiecePath(piece int) string {
+ return filepath.Join(me.baseDir, "incomplete", me.pieceHashHex(piece))
+}
+
+func (me *data) completedPiecePath(piece int) string {
+ return filepath.Join(me.baseDir, "complete", me.pieceHashHex(piece))
+}
+
+func (me *data) PieceCompleted(index int) (err error) {
+ var (
+ incompletePiecePath = me.incompletePiecePath(index)
+ completedPiecePath = me.completedPiecePath(index)
+ )
+ fSrc, err := os.Open(incompletePiecePath)
+ if err != nil {
+ return
+ }
+ defer fSrc.Close()
+ os.MkdirAll(filepath.Dir(completedPiecePath), dirPerm)
+ fDst, err := os.OpenFile(completedPiecePath, os.O_EXCL|os.O_CREATE|os.O_WRONLY, filePerm)
+ if err != nil {
+ return
+ }
+ defer fDst.Close()
+ hasher := sha1.New()
+ r := io.TeeReader(io.LimitReader(fSrc, me.info.Piece(index).Length()), hasher)
+ _, err = io.Copy(fDst, r)
+ if err != nil {
+ return
+ }
+ if !bytes.Equal(hasher.Sum(nil), me.info.Piece(index).Hash()) {
+ err = errors.New("piece incomplete")
+ os.Remove(completedPiecePath)
+ return
+ }
+ os.Remove(incompletePiecePath)
+ return
+}
+
+func (me *data) PieceComplete(piece int) bool {
+ _, err := os.Stat(me.completedPiecePath(piece))
+ return err == nil
+}
NoDefaultBlocklist: true,
- TorrentDataOpener: func(info *metainfo.Info) (torrent.Data, error) {
+ TorrentDataOpener: func(info *metainfo.Info) (torrent.StatelessData, error) {
return mmap.TorrentData(info, filepath.Join(layout.BaseDir, "download"))
},
type piece struct {
Hash pieceSum
+ complete bool
PendingChunkSpecs map[chunkSpec]struct{}
Hashing bool
QueuedForHash bool
}
func (p *piece) Complete() bool {
- return len(p.PendingChunkSpecs) == 0 && p.EverHashed
+ return p.complete
}
func lastChunkSpec(pieceLength peer_protocol.Integer) (cs chunkSpec) {
Port int
}
-type Data interface {
+type StatelessData interface {
ReadAt(p []byte, off int64) (n int, err error)
Close()
WriteAt(p []byte, off int64) (n int, err error)
WriteSectionTo(w io.Writer, off, n int64) (written int64, err error)
}
+type Data interface {
+ StatelessData
+ // We believe the piece data will pass a hash check.
+ PieceCompleted(index int) error
+ // Returns true if the piece is complete.
+ PieceComplete(index int) bool
+}
+
// Is not aware of Client. Maintains state of torrent for with-in a Client.
type torrent struct {
stateMu sync.Mutex
t.data.Close()
}
t.data = td
+ for i, p := range t.Pieces {
+ p.complete = t.data.PieceComplete(i)
+ }
return
}
util.CopyExact(ps[:], hash.Sum(nil))
return
}
+
func (t *torrent) haveAllPieces() bool {
if !t.haveInfo() {
return false