]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Rework piece completion
authorMatt Joiner <anacrolix@gmail.com>
Tue, 10 Mar 2015 15:41:21 +0000 (02:41 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Tue, 10 Mar 2015 15:41:21 +0000 (02:41 +1100)
client.go
data/blob/blob.go
data/blob/store.go [new file with mode: 0644]
download_strategies.go [deleted file]
misc.go
stateless.go [new file with mode: 0644]
torrent.go

index ce3289f08f49ae6048261525717878da20fa146b..5701834e6de9927346c45b490be50f955572fb71 100644 (file)
--- a/client.go
+++ b/client.go
@@ -100,29 +100,28 @@ func (cl *Client) queuePieceCheck(t *torrent, pieceIndex pp.Integer) {
 // been checked before.
 func (cl *Client) queueFirstHash(t *torrent, piece int) {
        p := t.Pieces[piece]
-       if p.EverHashed || p.Hashing || p.QueuedForHash || p.Complete() {
+       if p.EverHashed || p.Hashing || p.QueuedForHash || t.pieceComplete(piece) {
                return
        }
        cl.queuePieceCheck(t, pp.Integer(piece))
 }
 
 type Client struct {
-       noUpload         bool
-       dataDir          string
-       halfOpenLimit    int
-       peerID           [20]byte
-       listeners        []net.Listener
-       utpSock          *utp.Socket
-       disableTrackers  bool
-       downloadStrategy downloadStrategy
-       dHT              *dht.Server
-       disableUTP       bool
-       disableTCP       bool
-       ipBlockList      *iplist.IPList
-       bannedTorrents   map[InfoHash]struct{}
-       _configDir       string
-       config           Config
-       pruneTimer       *time.Timer
+       noUpload        bool
+       dataDir         string
+       halfOpenLimit   int
+       peerID          [20]byte
+       listeners       []net.Listener
+       utpSock         *utp.Socket
+       disableTrackers bool
+       dHT             *dht.Server
+       disableUTP      bool
+       disableTCP      bool
+       ipBlockList     *iplist.IPList
+       bannedTorrents  map[InfoHash]struct{}
+       _configDir      string
+       config          Config
+       pruneTimer      *time.Timer
 
        torrentDataOpener TorrentDataOpener
 
@@ -274,7 +273,10 @@ func (cl *Client) prepareRead(t *torrent, off int64) (n int64) {
        }
        piece := t.Pieces[index]
        cl.readRaisePiecePriorities(t, off)
-       for !piece.Complete() && !t.isClosed() {
+       for !t.pieceComplete(index) && !t.isClosed() {
+               // This is to prevent being starved if a piece is dropped before we
+               // can read it.
+               cl.readRaisePiecePriorities(t, off)
                piece.Event.Wait()
        }
        return t.Info.Piece(index).Length() - off%t.Info.PieceLength
@@ -292,6 +294,7 @@ type SectionOpener interface {
 }
 
 func dataReadAt(d data.Data, b []byte, off int64) (n int, err error) {
+again:
        if ra, ok := d.(io.ReaderAt); ok {
                return ra.ReadAt(b, off)
        }
@@ -304,6 +307,10 @@ func dataReadAt(d data.Data, b []byte, off int64) (n int, err error) {
                defer rc.Close()
                return io.ReadFull(rc, b)
        }
+       if dp, ok := super(d); ok {
+               d = dp.(data.Data)
+               goto again
+       }
        panic(fmt.Sprintf("can't read from %T", d))
 }
 
@@ -355,14 +362,7 @@ func (cl *Client) prioritizePiece(t *torrent, piece int, priority piecePriority)
        }
        cl.queueFirstHash(t, piece)
        t.Pieces[piece].Priority = priority
-       if t.wantPiece(piece) {
-               for _, c := range t.Conns {
-                       if c.PeerHasPiece(pp.Integer(piece)) {
-                               t.connPendPiece(c, piece)
-                               cl.replenishConnRequests(t, c)
-                       }
-               }
-       }
+       cl.pieceChanged(t, piece)
 }
 
 func (cl *Client) setEnvBlocklist() (err error) {
@@ -2224,11 +2224,45 @@ func (me *Client) WaitAll() bool {
        return true
 }
 
+func (me *Client) fillRequests(t *torrent, c *connection) {
+       if c.Interested {
+               if c.PeerChoked {
+                       return
+               }
+               if len(c.Requests) > c.requestsLowWater {
+                       return
+               }
+       }
+       addRequest := func(req request) (again bool) {
+               if len(c.Requests) >= 32 {
+                       return false
+               }
+               return c.Request(req)
+       }
+       for e := c.pieceRequestOrder.First(); e != nil; e = e.Next() {
+               pieceIndex := e.Piece()
+               if !c.PeerHasPiece(pp.Integer(pieceIndex)) {
+                       panic("piece in request order but peer doesn't have it")
+               }
+               if !t.wantPiece(pieceIndex) {
+                       panic("unwanted piece in connection request order")
+               }
+               piece := t.Pieces[pieceIndex]
+               for _, cs := range piece.shuffledPendingChunkSpecs() {
+                       r := request{pp.Integer(pieceIndex), cs}
+                       if !addRequest(r) {
+                               return
+                       }
+               }
+       }
+       return
+}
+
 func (me *Client) replenishConnRequests(t *torrent, c *connection) {
        if !t.haveInfo() {
                return
        }
-       me.downloadStrategy.FillRequests(t, c)
+       me.fillRequests(t, c)
        if len(c.Requests) == 0 && !c.PeerChoked {
                c.SetInterested(false)
        }
@@ -2248,7 +2282,7 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
        piece := t.Pieces[req.Index]
 
        // Do we actually want this chunk?
-       if _, ok := t.Pieces[req.Index].PendingChunkSpecs[req.chunkSpec]; !ok {
+       if _, ok := piece.PendingChunkSpecs[req.chunkSpec]; !ok || piece.Priority == piecePriorityNone {
                unusedDownloadedChunksCount.Add(1)
                c.UnwantedChunksReceived++
                return nil
@@ -2298,10 +2332,15 @@ func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) {
                        }
                }
        }
+       me.pieceChanged(t, int(piece))
+}
+
+func (me *Client) pieceChanged(t *torrent, piece int) {
+       correct := t.pieceComplete(piece)
+       p := t.Pieces[piece]
        if correct {
                p.Priority = piecePriorityNone
                p.PendingChunkSpecs = nil
-               p.complete = true
                p.Event.Broadcast()
        } else {
                if len(p.PendingChunkSpecs) == 0 {
@@ -2344,7 +2383,7 @@ func (cl *Client) verifyPiece(t *torrent, index pp.Integer) {
                cl.event.Wait()
        }
        p.QueuedForHash = false
-       if t.isClosed() || p.complete {
+       if t.isClosed() || t.pieceComplete(int(index)) {
                return
        }
        p.Hashing = true
index 5802c18a15e15de2cc5c8b6b0d952658fd62a986..a3ade8e687a6dd08547159b200af65d6eca6f84f 100644 (file)
@@ -1,39 +1,15 @@
 package blob
 
 import (
-       "bytes"
-       "crypto/sha1"
        "encoding/hex"
-       "errors"
        "io"
-       "os"
-       "path/filepath"
-
-       dataPkg "bitbucket.org/anacrolix/go.torrent/data"
 
        "github.com/anacrolix/libtorgo/metainfo"
 )
 
-const (
-       filePerm = 0640
-       dirPerm  = 0750
-)
-
 type data struct {
-       info    *metainfo.Info
-       baseDir string
-}
-
-type store struct {
-       baseDir string
-}
-
-func (me store) OpenTorrent(info *metainfo.Info) dataPkg.Data {
-       return &data{info, me.baseDir}
-}
-
-func NewStore(baseDir string) dataPkg.Store {
-       return store{baseDir}
+       info  *metainfo.Info
+       store *store
 }
 
 func (me *data) pieceHashHex(i int) string {
@@ -42,45 +18,26 @@ func (me *data) pieceHashHex(i int) string {
 
 func (me *data) Close() {}
 
-func (me *data) ReadAt(p []byte, off int64) (n int, err error) {
-       hash := me.pieceHashHex(int(off / me.info.PieceLength))
-       f, err := os.Open(me.baseDir + "/complete/" + hash)
-       if os.IsNotExist(err) {
-               f, err = os.Open(me.baseDir + "/incomplete/" + hash)
-               if os.IsNotExist(err) {
-                       err = io.EOF
-                       return
-               }
-               if err != nil {
-                       return
-               }
-       } else if err != nil {
+func (me *data) ReadAt(b []byte, off int64) (n int, err error) {
+       p := me.info.Piece(int(off / me.info.PieceLength))
+       f := me.store.pieceRead(p)
+       if f == nil {
+               err = io.ErrUnexpectedEOF
                return
        }
        defer f.Close()
-       off %= me.info.PieceLength
-       return f.ReadAt(p, off)
-}
-
-func (me *data) openComplete(piece int) (f *os.File, err error) {
-       return os.OpenFile(me.baseDir+"/complete/"+me.pieceHashHex(piece), os.O_RDWR, 0660)
+       n, err = f.ReadAt(b, off%me.info.PieceLength)
+       if err == io.EOF {
+               err = io.ErrUnexpectedEOF
+       }
+       return
 }
 
 func (me *data) WriteAt(p []byte, off int64) (n int, err error) {
        i := int(off / me.info.PieceLength)
        off %= me.info.PieceLength
        for len(p) != 0 {
-               _, err = os.Stat(me.baseDir + "/complete/" + me.pieceHashHex(i))
-               if err == nil {
-                       err = errors.New("can't write to completed piece")
-                       return
-               }
-               os.MkdirAll(me.baseDir+"/incomplete", 0750)
-               var f *os.File
-               f, err = os.OpenFile(me.baseDir+"/incomplete/"+me.pieceHashHex(i), os.O_WRONLY|os.O_CREATE, 0640)
-               if err != nil {
-                       return
-               }
+               f := me.store.pieceWrite(me.info.Piece(i))
                p1 := p
                maxN := me.info.Piece(i).Length() - off
                if int64(len(p1)) > maxN {
@@ -95,28 +52,24 @@ func (me *data) WriteAt(p []byte, off int64) (n int, err error) {
                }
                p = p[n1:]
                off = 0
+               i++
        }
        return
 }
 
 func (me *data) pieceReader(piece int, off int64) (ret io.ReadCloser, err error) {
-       f, err := os.Open(me.completedPiecePath(piece))
-       if os.IsNotExist(err) {
-               f, err = os.Open(me.incompletePiecePath(piece))
-               if os.IsNotExist(err) {
-                       err = io.EOF
-                       return
-               }
-               if err != nil {
-                       return
-               }
-       } else if err != nil {
+       f := me.store.pieceRead(me.info.Piece(piece))
+       if f == nil {
+               err = io.ErrUnexpectedEOF
                return
        }
        return struct {
                io.Reader
                io.Closer
-       }{io.NewSectionReader(f, off, me.info.Piece(piece).Length()-off), f}, nil
+       }{
+               Reader: io.NewSectionReader(f, off, me.info.Piece(piece).Length()-off),
+               Closer: f,
+       }, nil
 }
 
 func (me *data) WriteSectionTo(w io.Writer, off, n int64) (written int64, err error) {
@@ -130,59 +83,26 @@ func (me *data) WriteSectionTo(w io.Writer, off, n int64) (written int64, err er
                                err = nil
                        }
                        return
+                       pr.Close()
                }
                var n1 int64
                n1, err = io.CopyN(w, pr, n)
+               pr.Close()
                written += n1
                n -= n1
                if err != nil {
                        return
                }
                off = 0
+               i++
        }
        return
 }
 
-func (me *data) incompletePiecePath(piece int) string {
-       return filepath.Join(me.baseDir, "incomplete", me.pieceHashHex(piece))
-}
-
-func (me *data) completedPiecePath(piece int) string {
-       return filepath.Join(me.baseDir, "complete", me.pieceHashHex(piece))
-}
-
 func (me *data) PieceCompleted(index int) (err error) {
-       var (
-               incompletePiecePath = me.incompletePiecePath(index)
-               completedPiecePath  = me.completedPiecePath(index)
-       )
-       fSrc, err := os.Open(incompletePiecePath)
-       if err != nil {
-               return
-       }
-       defer fSrc.Close()
-       os.MkdirAll(filepath.Dir(completedPiecePath), dirPerm)
-       fDst, err := os.OpenFile(completedPiecePath, os.O_EXCL|os.O_CREATE|os.O_WRONLY, filePerm)
-       if err != nil {
-               return
-       }
-       defer fDst.Close()
-       hasher := sha1.New()
-       r := io.TeeReader(io.LimitReader(fSrc, me.info.Piece(index).Length()), hasher)
-       _, err = io.Copy(fDst, r)
-       if err != nil {
-               return
-       }
-       if !bytes.Equal(hasher.Sum(nil), me.info.Piece(index).Hash()) {
-               err = errors.New("piece incomplete")
-               os.Remove(completedPiecePath)
-               return
-       }
-       os.Remove(incompletePiecePath)
-       return
+       return me.store.PieceCompleted(me.info.Piece(index))
 }
 
 func (me *data) PieceComplete(piece int) bool {
-       _, err := os.Stat(me.completedPiecePath(piece))
-       return err == nil
+       return me.store.pieceComplete(me.info.Piece(piece))
 }
diff --git a/data/blob/store.go b/data/blob/store.go
new file mode 100644 (file)
index 0000000..766129f
--- /dev/null
@@ -0,0 +1,195 @@
+package blob
+
+import (
+       "bytes"
+       "crypto/sha1"
+       "encoding/hex"
+       "errors"
+       "fmt"
+       "io"
+       "math/rand"
+       "os"
+       "path/filepath"
+
+       dataPkg "bitbucket.org/anacrolix/go.torrent/data"
+       "github.com/anacrolix/libtorgo/metainfo"
+)
+
+const (
+       filePerm = 0640
+       dirPerm  = 0750
+)
+
+type store struct {
+       baseDir   string
+       capacity  int64
+       completed map[string]struct{}
+}
+
+func (me *store) OpenTorrent(info *metainfo.Info) dataPkg.Data {
+       return &data{info, me}
+}
+
+type StoreOption func(*store)
+
+func Capacity(bytes int64) StoreOption {
+       return func(s *store) {
+               s.capacity = bytes
+       }
+}
+
+func NewStore(baseDir string, opt ...StoreOption) dataPkg.Store {
+       s := &store{baseDir, -1, nil}
+       for _, o := range opt {
+               o(s)
+       }
+       s.initCompleted()
+       return s
+}
+
+func (me *store) initCompleted() {
+       fis, err := me.readCompletedDir()
+       if err != nil {
+               panic(err)
+       }
+       me.completed = make(map[string]struct{}, len(fis))
+       for _, fi := range fis {
+               me.completed[fi.Name()] = struct{}{}
+       }
+}
+
+func (me *store) completePieceDirPath() string {
+       return filepath.Join(me.baseDir, "complete")
+}
+
+func (me *store) path(p metainfo.Piece, completed bool) string {
+       return filepath.Join(me.baseDir, func() string {
+               if completed {
+                       return "complete"
+               } else {
+                       return "incomplete"
+               }
+       }(), fmt.Sprintf("%x", p.Hash()))
+}
+
+func (me *store) pieceComplete(p metainfo.Piece) bool {
+       _, ok := me.completed[hex.EncodeToString(p.Hash())]
+       return ok
+}
+
+func (me *store) pieceWrite(p metainfo.Piece) (f *os.File) {
+       if me.pieceComplete(p) {
+               return
+       }
+       name := me.path(p, false)
+       os.MkdirAll(filepath.Dir(name), dirPerm)
+       f, err := os.OpenFile(name, os.O_CREATE|os.O_WRONLY, filePerm)
+       if err != nil {
+               panic(err)
+       }
+       return
+}
+
+func (me *store) pieceRead(p metainfo.Piece) (f *os.File) {
+       f, err := os.Open(me.path(p, true))
+       if err == nil {
+               return
+       }
+       if !os.IsNotExist(err) {
+               panic(err)
+       }
+       f, err = os.Open(me.path(p, false))
+       if err == nil {
+               return
+       }
+       if !os.IsNotExist(err) {
+               panic(err)
+       }
+       return
+}
+
+func (me *store) readCompletedDir() (fis []os.FileInfo, err error) {
+       f, err := os.Open(me.completePieceDirPath())
+       if err != nil {
+               if os.IsNotExist(err) {
+                       err = nil
+               }
+               return
+       }
+       fis, err = f.Readdir(-1)
+       f.Close()
+       return
+}
+
+func (me *store) removeCompleted(name string) (err error) {
+       err = os.Remove(filepath.Join(me.completePieceDirPath(), name))
+       if os.IsNotExist(err) {
+               err = nil
+       }
+       if err != nil {
+               return err
+       }
+       delete(me.completed, name)
+       return
+}
+
+func (me *store) makeSpace(space int64) error {
+       if me.capacity < 0 {
+               return nil
+       }
+       if space > me.capacity {
+               return errors.New("space requested exceeds capacity")
+       }
+       fis, err := me.readCompletedDir()
+       if err != nil {
+               return err
+       }
+       var size int64
+       for _, fi := range fis {
+               size += fi.Size()
+       }
+       for size > me.capacity-space {
+               i := rand.Intn(len(fis))
+               me.removeCompleted(fis[i].Name())
+               size -= fis[i].Size()
+               fis[i] = fis[len(fis)-1]
+               fis = fis[:len(fis)-1]
+       }
+       return nil
+}
+
+func (me *store) PieceCompleted(p metainfo.Piece) (err error) {
+       err = me.makeSpace(p.Length())
+       if err != nil {
+               return
+       }
+       var (
+               incompletePiecePath = me.path(p, false)
+               completedPiecePath  = me.path(p, true)
+       )
+       fSrc, err := os.Open(incompletePiecePath)
+       if err != nil {
+               return
+       }
+       defer fSrc.Close()
+       os.MkdirAll(filepath.Dir(completedPiecePath), dirPerm)
+       fDst, err := os.OpenFile(completedPiecePath, os.O_EXCL|os.O_CREATE|os.O_WRONLY, filePerm)
+       if err != nil {
+               return
+       }
+       defer fDst.Close()
+       hasher := sha1.New()
+       r := io.TeeReader(io.LimitReader(fSrc, p.Length()), hasher)
+       _, err = io.Copy(fDst, r)
+       if err != nil {
+               return
+       }
+       if !bytes.Equal(hasher.Sum(nil), p.Hash()) {
+               err = errors.New("piece incomplete")
+               os.Remove(completedPiecePath)
+               return
+       }
+       os.Remove(incompletePiecePath)
+       me.completed[hex.EncodeToString(p.Hash())] = struct{}{}
+       return
+}
diff --git a/download_strategies.go b/download_strategies.go
deleted file mode 100644 (file)
index fdf9e24..0000000
+++ /dev/null
@@ -1,77 +0,0 @@
-package torrent
-
-import (
-       "io"
-
-       pp "bitbucket.org/anacrolix/go.torrent/peer_protocol"
-)
-
-type downloadStrategy interface {
-       // Tops up the outgoing pending requests.
-       FillRequests(*torrent, *connection)
-       TorrentStarted(*torrent)
-       TorrentStopped(*torrent)
-       DeleteRequest(*torrent, request)
-       TorrentPrioritize(t *torrent, off, _len int64)
-       TorrentGotChunk(*torrent, request)
-       TorrentGotPiece(t *torrent, piece int)
-       WriteStatus(w io.Writer)
-       AssertNotRequested(*torrent, request)
-       PendingData(*torrent) bool
-}
-
-type defaultDownloadStrategy struct{}
-
-func (me *defaultDownloadStrategy) PendingData(t *torrent) bool {
-       return !t.haveAllPieces()
-}
-
-func (me *defaultDownloadStrategy) AssertNotRequested(t *torrent, r request) {}
-
-func (me *defaultDownloadStrategy) WriteStatus(w io.Writer) {}
-
-func (s *defaultDownloadStrategy) FillRequests(t *torrent, c *connection) {
-       if c.Interested {
-               if c.PeerChoked {
-                       return
-               }
-               if len(c.Requests) > c.requestsLowWater {
-                       return
-               }
-       }
-       addRequest := func(req request) (again bool) {
-               if len(c.Requests) >= 32 {
-                       return false
-               }
-               return c.Request(req)
-       }
-       for e := c.pieceRequestOrder.First(); e != nil; e = e.Next() {
-               pieceIndex := e.Piece()
-               if !c.PeerHasPiece(pp.Integer(pieceIndex)) {
-                       panic("piece in request order but peer doesn't have it")
-               }
-               if !t.wantPiece(pieceIndex) {
-                       panic("unwanted piece in connection request order")
-               }
-               piece := t.Pieces[pieceIndex]
-               for _, cs := range piece.shuffledPendingChunkSpecs() {
-                       r := request{pp.Integer(pieceIndex), cs}
-                       if !addRequest(r) {
-                               return
-                       }
-               }
-       }
-       return
-}
-
-func (s *defaultDownloadStrategy) TorrentStarted(t *torrent) {}
-
-func (s *defaultDownloadStrategy) TorrentStopped(t *torrent) {
-}
-
-func (s *defaultDownloadStrategy) DeleteRequest(t *torrent, r request) {
-}
-
-func (me *defaultDownloadStrategy) TorrentGotChunk(t *torrent, c request)      {}
-func (me *defaultDownloadStrategy) TorrentGotPiece(t *torrent, piece int)      {}
-func (*defaultDownloadStrategy) TorrentPrioritize(t *torrent, off, _len int64) {}
diff --git a/misc.go b/misc.go
index b47198b2276346a06fa2cbb728c022f72dfb6639..c63fa6ebc253a22565625e78440c0bddfa7a2e80 100644 (file)
--- a/misc.go
+++ b/misc.go
@@ -45,7 +45,6 @@ const (
 
 type piece struct {
        Hash              pieceSum
-       complete          bool
        PendingChunkSpecs map[chunkSpec]struct{}
        Hashing           bool
        QueuedForHash     bool
@@ -72,10 +71,6 @@ func (p *piece) shuffledPendingChunkSpecs() (css []chunkSpec) {
        return
 }
 
-func (p *piece) Complete() bool {
-       return p.complete
-}
-
 func lastChunkSpec(pieceLength peer_protocol.Integer) (cs chunkSpec) {
        cs.Begin = (pieceLength - 1) / chunkSize * chunkSize
        cs.Length = pieceLength - cs.Begin
@@ -108,3 +103,18 @@ func metadataPieceSize(totalSize int, piece int) int {
        }
        return ret
 }
+
+type Super interface {
+       Super() interface{}
+}
+
+// Returns ok if there's a parent, and it's not nil.
+func super(child interface{}) (parent interface{}, ok bool) {
+       s, ok := child.(Super)
+       if !ok {
+               return
+       }
+       parent = s.Super()
+       ok = parent != nil
+       return
+}
diff --git a/stateless.go b/stateless.go
new file mode 100644 (file)
index 0000000..3caa495
--- /dev/null
@@ -0,0 +1,21 @@
+package torrent
+
+import "bitbucket.org/anacrolix/go.torrent/data"
+
+type statelessDataWrapper struct {
+       data.Data
+       complete []bool
+}
+
+func (me *statelessDataWrapper) PieceComplete(piece int) bool {
+       return me.complete[piece]
+}
+
+func (me *statelessDataWrapper) PieceCompleted(piece int) error {
+       me.complete[piece] = true
+       return nil
+}
+
+func (me *statelessDataWrapper) Super() interface{} {
+       return me.Data
+}
index 751de60b97d98f7b7a2872b1079d6bd247a88627..4217efa16a896a41b1ccee918fa582bccb7bb7d1 100644 (file)
@@ -12,6 +12,8 @@ import (
        "sync"
        "time"
 
+       "github.com/bradfitz/iter"
+
        "bitbucket.org/anacrolix/go.torrent/data"
        pp "bitbucket.org/anacrolix/go.torrent/peer_protocol"
        "bitbucket.org/anacrolix/go.torrent/tracker"
@@ -20,11 +22,11 @@ import (
        "github.com/anacrolix/libtorgo/metainfo"
 )
 
-func (t *torrent) PieceNumPendingBytes(index pp.Integer) (count pp.Integer) {
-       piece := t.Pieces[index]
-       if piece.complete {
+func (t *torrent) PieceNumPendingBytes(index int) (count pp.Integer) {
+       if t.pieceComplete(index) {
                return 0
        }
+       piece := t.Pieces[index]
        if !piece.EverHashed {
                return t.PieceLength(index)
        }
@@ -66,7 +68,7 @@ type torrent struct {
        Pieces   []*piece
        length   int64
 
-       data data.Data
+       data StatefulData
 
        Info *metainfo.Info
        // Active peer connections.
@@ -93,6 +95,12 @@ type torrent struct {
        pruneTimer *time.Timer
 }
 
+func (t *torrent) pieceComplete(piece int) bool {
+       // TODO: This is called when setting metadata, and before storage is
+       // assigned, which doesn't seem right.
+       return t.data != nil && t.data.PieceComplete(piece)
+}
+
 // A file-like handle to torrent data that implements SectionOpener. Opened
 // sections will be reused so long as Reads and ReadAt's are contiguous.
 type handle struct {
@@ -301,11 +309,10 @@ func (t *torrent) setStorage(td data.Data) (err error) {
        if c, ok := t.data.(io.Closer); ok {
                c.Close()
        }
-       t.data = td
-       if sd, ok := t.data.(StatefulData); ok {
-               for i, p := range t.Pieces {
-                       p.complete = sd.PieceComplete(i)
-               }
+       if sd, ok := td.(StatefulData); ok {
+               t.data = sd
+       } else {
+               t.data = &statelessDataWrapper{td, make([]bool, t.Info.NumPieces())}
        }
        return
 }
@@ -351,7 +358,7 @@ func (t *torrent) Name() string {
 func (t *torrent) pieceStatusChar(index int) byte {
        p := t.Pieces[index]
        switch {
-       case p.Complete():
+       case t.pieceComplete(index):
                return 'C'
        case p.QueuedForHash:
                return 'Q'
@@ -550,8 +557,8 @@ func (t *torrent) numPieces() int {
 }
 
 func (t *torrent) numPiecesCompleted() (num int) {
-       for _, p := range t.Pieces {
-               if p.Complete() {
+       for i := range iter.N(t.Info.NumPieces()) {
+               if t.pieceComplete(i) {
                        num++
                }
        }
@@ -689,8 +696,8 @@ func (t *torrent) haveAllPieces() bool {
        if !t.haveInfo() {
                return false
        }
-       for _, piece := range t.Pieces {
-               if !piece.Complete() {
+       for i := range t.Pieces {
+               if !t.pieceComplete(i) {
                        return false
                }
        }
@@ -698,8 +705,8 @@ func (t *torrent) haveAllPieces() bool {
 }
 
 func (me *torrent) haveAnyPieces() bool {
-       for _, piece := range me.Pieces {
-               if piece.Complete() {
+       for i := range me.Pieces {
+               if me.pieceComplete(i) {
                        return true
                }
        }
@@ -707,7 +714,7 @@ func (me *torrent) haveAnyPieces() bool {
 }
 
 func (t *torrent) havePiece(index int) bool {
-       return t.haveInfo() && t.Pieces[index].Complete()
+       return t.haveInfo() && t.pieceComplete(index)
 }
 
 func (t *torrent) haveChunk(r request) bool {
@@ -732,7 +739,7 @@ func (t *torrent) wantPiece(index int) bool {
                return false
        }
        p := t.Pieces[index]
-       return p.EverHashed && len(p.PendingChunkSpecs) != 0 && p.Priority != piecePriorityNone
+       return !t.pieceComplete(index) && p.Priority != piecePriorityNone && !p.QueuedForHash && !p.Hashing
 }
 
 func (t *torrent) connHasWantedPieces(c *connection) bool {