]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Make blob data stateful
authorMatt Joiner <anacrolix@gmail.com>
Fri, 27 Feb 2015 01:45:55 +0000 (12:45 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Fri, 27 Feb 2015 01:45:55 +0000 (12:45 +1100)
client.go
client_test.go
data/blob/blob.go
fs/torrentfs_test.go
misc.go
torrent.go

index 196f67a5122fb11ebd128520c6f97c7b61152b41..b96f67d0350490625a748918926c2e079d07c170 100644 (file)
--- a/client.go
+++ b/client.go
@@ -99,7 +99,7 @@ func (cl *Client) queuePieceCheck(t *torrent, pieceIndex pp.Integer) {
 // been checked before.
 func (cl *Client) queueFirstHash(t *torrent, piece int) {
        p := t.Pieces[piece]
-       if p.EverHashed || p.Hashing || p.QueuedForHash {
+       if p.EverHashed || p.Hashing || p.QueuedForHash || p.Complete() {
                return
        }
        cl.queuePieceCheck(t, pp.Integer(piece))
@@ -409,7 +409,7 @@ func NewClient(cfg *Config) (cl *Client, err error) {
                disableTCP:       cfg.DisableTCP,
                _configDir:       cfg.ConfigDir,
                config:           *cfg,
-               torrentDataOpener: func(md *metainfo.Info) (Data, error) {
+               torrentDataOpener: func(md *metainfo.Info) (StatelessData, error) {
                        return filePkg.TorrentData(md, cfg.DataDir), nil
                },
 
@@ -1553,7 +1553,16 @@ func (cl *Client) setStorage(t *torrent, td Data) (err error) {
        return
 }
 
-type TorrentDataOpener func(*metainfo.Info) (Data, error)
+type TorrentDataOpener func(*metainfo.Info) (StatelessData, error)
+
+type statelessDataWrapper struct {
+       StatelessData
+}
+
+func (statelessDataWrapper) PieceComplete(int) bool   { return false }
+func (statelessDataWrapper) PieceCompleted(int) error { return nil }
+
+var _ Data = statelessDataWrapper{}
 
 func (cl *Client) setMetaData(t *torrent, md metainfo.Info, bytes []byte) (err error) {
        err = t.setMetadata(md, bytes, &cl.mu)
@@ -1571,10 +1580,14 @@ func (cl *Client) setMetaData(t *torrent, md metainfo.Info, bytes []byte) (err e
                return
        }
        close(t.gotMetainfo)
-       td, err := cl.torrentDataOpener(&md)
+       stateless, err := cl.torrentDataOpener(&md)
        if err != nil {
                return
        }
+       td, ok := stateless.(Data)
+       if !ok {
+               td = statelessDataWrapper{stateless}
+       }
        err = cl.setStorage(t, td)
        return
 }
@@ -2205,9 +2218,17 @@ func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) {
                failedPieceHashes.Add(1)
        }
        p.EverHashed = true
+       if correct {
+               err := t.data.PieceCompleted(int(piece))
+               if err != nil {
+                       log.Printf("error completing piece: %s", err)
+                       correct = false
+               }
+       }
        if correct {
                p.Priority = piecePriorityNone
                p.PendingChunkSpecs = nil
+               p.complete = true
                p.Event.Broadcast()
                me.downloadStrategy.TorrentGotPiece(t, int(piece))
        } else {
@@ -2250,11 +2271,11 @@ func (cl *Client) verifyPiece(t *torrent, index pp.Integer) {
        for p.Hashing || t.data == nil {
                cl.event.Wait()
        }
-       if t.isClosed() {
+       p.QueuedForHash = false
+       if t.isClosed() || p.complete {
                return
        }
        p.Hashing = true
-       p.QueuedForHash = false
        cl.mu.Unlock()
        sum := t.hashPiece(index)
        cl.mu.Lock()
index d873fbe1a7e37e4a68b8a286136f0b187626b735..5dc8a4d3292b2b9cfb0200cbef4b434c363b3993 100644 (file)
@@ -254,7 +254,7 @@ func TestClientTransfer(t *testing.T) {
                t.Fatal(err)
        }
        defer os.RemoveAll(leecherDataDir)
-       cfg.TorrentDataOpener = func(info *metainfo.Info) (Data, error) {
+       cfg.TorrentDataOpener = func(info *metainfo.Info) (StatelessData, error) {
                return blob.TorrentData(info, leecherDataDir), nil
        }
        leecher, _ := NewClient(&cfg)
index 43657f2ada9fffca18edbc30d4148422a36fe6e1..f8ece667a8a17c71cf57c85b42e17048e787075b 100644 (file)
@@ -1,14 +1,22 @@
 package blob
 
 import (
+       "bytes"
+       "crypto/sha1"
        "encoding/hex"
        "errors"
        "io"
        "os"
+       "path/filepath"
 
        "github.com/anacrolix/libtorgo/metainfo"
 )
 
+const (
+       filePerm = 0640
+       dirPerm  = 0750
+)
+
 type data struct {
        info    *metainfo.Info
        baseDir string
@@ -125,3 +133,47 @@ func (me *data) WriteSectionTo(w io.Writer, off, n int64) (written int64, err er
        }
        return
 }
+
+func (me *data) incompletePiecePath(piece int) string {
+       return filepath.Join(me.baseDir, "incomplete", me.pieceHashHex(piece))
+}
+
+func (me *data) completedPiecePath(piece int) string {
+       return filepath.Join(me.baseDir, "complete", me.pieceHashHex(piece))
+}
+
+func (me *data) PieceCompleted(index int) (err error) {
+       var (
+               incompletePiecePath = me.incompletePiecePath(index)
+               completedPiecePath  = me.completedPiecePath(index)
+       )
+       fSrc, err := os.Open(incompletePiecePath)
+       if err != nil {
+               return
+       }
+       defer fSrc.Close()
+       os.MkdirAll(filepath.Dir(completedPiecePath), dirPerm)
+       fDst, err := os.OpenFile(completedPiecePath, os.O_EXCL|os.O_CREATE|os.O_WRONLY, filePerm)
+       if err != nil {
+               return
+       }
+       defer fDst.Close()
+       hasher := sha1.New()
+       r := io.TeeReader(io.LimitReader(fSrc, me.info.Piece(index).Length()), hasher)
+       _, err = io.Copy(fDst, r)
+       if err != nil {
+               return
+       }
+       if !bytes.Equal(hasher.Sum(nil), me.info.Piece(index).Hash()) {
+               err = errors.New("piece incomplete")
+               os.Remove(completedPiecePath)
+               return
+       }
+       os.Remove(incompletePiecePath)
+       return
+}
+
+func (me *data) PieceComplete(piece int) bool {
+       _, err := os.Stat(me.completedPiecePath(piece))
+       return err == nil
+}
index 69df66b02bb3db0891696844c2a138980a9fa9ca..4c865ba4e483cc7759fe68ebb6e5f42e49f2c8a1 100644 (file)
@@ -191,7 +191,7 @@ func TestDownloadOnDemand(t *testing.T) {
 
                NoDefaultBlocklist: true,
 
-               TorrentDataOpener: func(info *metainfo.Info) (torrent.Data, error) {
+               TorrentDataOpener: func(info *metainfo.Info) (torrent.StatelessData, error) {
                        return mmap.TorrentData(info, filepath.Join(layout.BaseDir, "download"))
                },
 
diff --git a/misc.go b/misc.go
index f14c8f74d9d752b1c605bcb91954b7ac5801d17c..c3da1048680fa215a7550ad7a94d856c349f0e03 100644 (file)
--- a/misc.go
+++ b/misc.go
@@ -45,6 +45,7 @@ const (
 
 type piece struct {
        Hash              pieceSum
+       complete          bool
        PendingChunkSpecs map[chunkSpec]struct{}
        Hashing           bool
        QueuedForHash     bool
@@ -72,7 +73,7 @@ func (p *piece) shuffledPendingChunkSpecs() (css []chunkSpec) {
 }
 
 func (p *piece) Complete() bool {
-       return len(p.PendingChunkSpecs) == 0 && p.EverHashed
+       return p.complete
 }
 
 func lastChunkSpec(pieceLength peer_protocol.Integer) (cs chunkSpec) {
index 3804e0675f7c2121f15e1e118dba04bbe783d7f3..5217db5646e275839d6b1be5d390dcf6e403341f 100644 (file)
@@ -38,13 +38,21 @@ type peersKey struct {
        Port    int
 }
 
-type Data interface {
+type StatelessData interface {
        ReadAt(p []byte, off int64) (n int, err error)
        Close()
        WriteAt(p []byte, off int64) (n int, err error)
        WriteSectionTo(w io.Writer, off, n int64) (written int64, err error)
 }
 
+type Data interface {
+       StatelessData
+       // We believe the piece data will pass a hash check.
+       PieceCompleted(index int) error
+       // Returns true if the piece is complete.
+       PieceComplete(index int) bool
+}
+
 // Is not aware of Client. Maintains state of torrent for with-in a Client.
 type torrent struct {
        stateMu sync.Mutex
@@ -211,6 +219,9 @@ func (t *torrent) setStorage(td Data) (err error) {
                t.data.Close()
        }
        t.data = td
+       for i, p := range t.Pieces {
+               p.complete = t.data.PieceComplete(i)
+       }
        return
 }
 
@@ -586,6 +597,7 @@ func (t *torrent) hashPiece(piece pp.Integer) (ps pieceSum) {
        util.CopyExact(ps[:], hash.Sum(nil))
        return
 }
+
 func (t *torrent) haveAllPieces() bool {
        if !t.haveInfo() {
                return false