]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Support opening handles to Torrent and File
authorMatt Joiner <anacrolix@gmail.com>
Sun, 1 Mar 2015 03:32:54 +0000 (14:32 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Sun, 1 Mar 2015 03:32:54 +0000 (14:32 +1100)
client.go
data/blob/blob.go
data/file/file.go
torrent.go

index b96f67d0350490625a748918926c2e079d07c170..648dbd9ca6f9b42b4e132620de4264b0fd7b288b 100644 (file)
--- a/client.go
+++ b/client.go
@@ -245,27 +245,67 @@ func (cl *Client) torrentReadAt(t *torrent, off int64, p []byte) (n int, err err
                err = io.EOF
                return
        }
-       piece := t.Pieces[index]
        pieceOff := pp.Integer(off % int64(t.usualPieceSize()))
        pieceLeft := int(t.PieceLength(pp.Integer(index)) - pieceOff)
        if pieceLeft <= 0 {
                err = io.EOF
                return
        }
-       cl.readRaisePiecePriorities(t, off, int64(len(p)))
        if len(p) > pieceLeft {
                p = p[:pieceLeft]
        }
        if len(p) == 0 {
                panic(len(p))
        }
+       cl.prepareRead(t, off)
+       return dataReadAt(t.data, p, off)
+}
+
+// Sets priorities to download from the given offset. Returns when the piece
+// at the given offset can be read. Returns the number of bytes that
+// immediately available from the offset.
+func (cl *Client) prepareRead(t *torrent, off int64) (n int64) {
+       index := int(off / int64(t.usualPieceSize()))
+       // Reading outside the bounds of a file is an error.
+       if index < 0 || index >= t.numPieces() {
+               return
+       }
+       piece := t.Pieces[index]
+       cl.readRaisePiecePriorities(t, off)
        for !piece.Complete() && !t.isClosed() {
                piece.Event.Wait()
        }
-       return t.data.ReadAt(p, off)
+       return t.Info.Piece(index).Length() - off%t.Info.PieceLength
 }
 
-func (cl *Client) readRaisePiecePriorities(t *torrent, off, _len int64) {
+func (T Torrent) prepareRead(off int64) (avail int64) {
+       T.cl.mu.Lock()
+       defer T.cl.mu.Unlock()
+       return T.cl.prepareRead(T.torrent, off)
+}
+
+// Data implements a streaming interface that's more efficient than ReadAt.
+type SectionOpener interface {
+       OpenSection(off, n int64) (io.ReadCloser, error)
+}
+
+func dataReadAt(d Data, b []byte, off int64) (n int, err error) {
+       if ra, ok := d.(io.ReaderAt); ok {
+               return ra.ReadAt(b, off)
+       }
+       if so, ok := d.(SectionOpener); ok {
+               var rc io.ReadCloser
+               rc, err = so.OpenSection(off, int64(len(b)))
+               if err != nil {
+                       return
+               }
+               defer rc.Close()
+               return io.ReadFull(rc, b)
+       }
+       panic(fmt.Sprintf("can't read from %T", d))
+}
+
+func (cl *Client) readRaisePiecePriorities(t *torrent, off int64) {
        index := int(off / int64(t.usualPieceSize()))
        cl.raisePiecePriority(t, index, piecePriorityNow)
        index++
@@ -1185,7 +1225,7 @@ func (me *Client) connectionLoop(t *torrent, c *connection) error {
                        // routine.
                        // c.PeerRequests[request] = struct{}{}
                        p := make([]byte, msg.Length)
-                       n, err := t.data.ReadAt(p, int64(t.PieceLength(0))*int64(msg.Index)+int64(msg.Begin))
+                       n, err := dataReadAt(t.data, p, int64(t.PieceLength(0))*int64(msg.Index)+int64(msg.Begin))
                        if err != nil {
                                return fmt.Errorf("reading t data to serve request %q: %s", request, err)
                        }
@@ -1555,15 +1595,6 @@ func (cl *Client) setStorage(t *torrent, td Data) (err error) {
 
 type TorrentDataOpener func(*metainfo.Info) (StatelessData, error)
 
-type statelessDataWrapper struct {
-       StatelessData
-}
-
-func (statelessDataWrapper) PieceComplete(int) bool   { return false }
-func (statelessDataWrapper) PieceCompleted(int) error { return nil }
-
-var _ Data = statelessDataWrapper{}
-
 func (cl *Client) setMetaData(t *torrent, md metainfo.Info, bytes []byte) (err error) {
        err = t.setMetadata(md, bytes, &cl.mu)
        if err != nil {
@@ -1580,14 +1611,10 @@ func (cl *Client) setMetaData(t *torrent, md metainfo.Info, bytes []byte) (err e
                return
        }
        close(t.gotMetainfo)
-       stateless, err := cl.torrentDataOpener(&md)
+       td, err := cl.torrentDataOpener(&md)
        if err != nil {
                return
        }
-       td, ok := stateless.(Data)
-       if !ok {
-               td = statelessDataWrapper{stateless}
-       }
        err = cl.setStorage(t, td)
        return
 }
@@ -1695,6 +1722,61 @@ func (f File) Path() string {
        return f.path
 }
 
+type Handle interface {
+       io.Reader
+       io.Seeker
+       io.Closer
+}
+
+// Implements a Handle within a subsection of another Handle.
+type sectionHandle struct {
+       h           Handle
+       off, n, cur int64
+}
+
+func (me *sectionHandle) Seek(offset int64, whence int) (ret int64, err error) {
+       if whence == 0 {
+               offset += me.off
+       } else if whence == 2 {
+               whence = 0
+               offset = me.off + me.n
+       }
+       ret, err = me.h.Seek(offset, whence)
+       ret -= me.off
+       return
+}
+
+func (me *sectionHandle) Close() error {
+       return me.h.Close()
+}
+
+func (me *sectionHandle) Read(b []byte) (n int, err error) {
+       max := me.off + me.n - me.cur
+       if int64(len(b)) > max {
+               b = b[:max]
+       }
+       n, err = me.h.Read(b)
+       me.cur += int64(n)
+       if err != nil {
+               return
+       }
+       if me.cur == me.off+me.n {
+               err = io.EOF
+       }
+       return
+}
+
+func (f File) Open() (h Handle, err error) {
+       h = f.t.NewReadHandle()
+       _, err = h.Seek(f.offset, os.SEEK_SET)
+       if err != nil {
+               h.Close()
+               return
+       }
+       h = &sectionHandle{h, f.offset, f.Length(), f.offset}
+       return
+}
+
 func (f File) ReadAt(p []byte, off int64) (n int, err error) {
        maxLen := f.length - off
        if int64(len(p)) > maxLen {
@@ -1745,16 +1827,17 @@ func (f *File) PrioritizeRegion(off, len int64) {
 // Returns handles to the files in the torrent. This requires the metainfo is
 // available first.
 func (t Torrent) Files() (ret []File) {
-       select {
-       case <-t.GotMetainfo:
-       default:
+       t.cl.mu.Lock()
+       info := t.Info
+       t.cl.mu.Unlock()
+       if info == nil {
                return
        }
        var offset int64
-       for _, fi := range t.Info.UpvertedFiles() {
+       for _, fi := range info.UpvertedFiles() {
                ret = append(ret, File{
                        t,
-                       strings.Join(append([]string{t.Info.Name}, fi.Path...), "/"),
+                       strings.Join(append([]string{info.Name}, fi.Path...), "/"),
                        offset,
                        fi.Length,
                        fi,
@@ -2219,10 +2302,12 @@ func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) {
        }
        p.EverHashed = true
        if correct {
-               err := t.data.PieceCompleted(int(piece))
-               if err != nil {
-                       log.Printf("error completing piece: %s", err)
-                       correct = false
+               if sd, ok := t.data.(StatefulData); ok {
+                       err := sd.PieceCompleted(int(piece))
+                       if err != nil {
+                               log.Printf("error completing piece: %s", err)
+                               correct = false
+                       }
                }
        }
        if correct {
index f8ece667a8a17c71cf57c85b42e17048e787075b..2f871d6fe69c2b254943eb85ccabbd9a9180e4ff 100644 (file)
@@ -90,10 +90,9 @@ func (me *data) WriteAt(p []byte, off int64) (n int, err error) {
 }
 
 func (me *data) pieceReader(piece int, off int64) (ret io.ReadCloser, err error) {
-       hash := me.pieceHashHex(piece)
-       f, err := os.Open(me.baseDir + "/complete/" + hash)
+       f, err := os.Open(me.completedPiecePath(piece))
        if os.IsNotExist(err) {
-               f, err = os.Open(me.baseDir + "/incomplete/" + hash)
+               f, err = os.Open(me.incompletePiecePath(piece))
                if os.IsNotExist(err) {
                        err = io.EOF
                        return
index 5e6608defb6f178cbe8111f26214f68af583d9c2..4a22e25dbaf445422c2f22716dd1d8bbbbacf01b 100644 (file)
@@ -2,6 +2,7 @@ package file
 
 import (
        "io"
+       "io/ioutil"
        "os"
        "path/filepath"
 
@@ -47,8 +48,6 @@ func (me data) ReadAt(p []byte, off int64) (n int, err error) {
        return
 }
 
-func (me data) Close() {}
-
 func (me data) WriteAt(p []byte, off int64) (n int, err error) {
        for _, fi := range me.info.UpvertedFiles() {
                if off >= fi.Length {
index 5217db5646e275839d6b1be5d390dcf6e403341f..890d77ac11421d4d31702f01e32ca457bd3c1c7e 100644 (file)
@@ -2,6 +2,7 @@ package torrent
 
 import (
        "container/heap"
+       "errors"
        "fmt"
        "io"
        "log"
@@ -19,6 +20,9 @@ import (
 
 func (t *torrent) PieceNumPendingBytes(index pp.Integer) (count pp.Integer) {
        piece := t.Pieces[index]
+       if piece.complete {
+               return 0
+       }
        if !piece.EverHashed {
                return t.PieceLength(index)
        }
@@ -39,14 +43,22 @@ type peersKey struct {
 }
 
 type StatelessData interface {
-       ReadAt(p []byte, off int64) (n int, err error)
-       Close()
+       // OpenSection(off, n int64) (io.ReadCloser, error)
+       // ReadAt(p []byte, off int64) (n int, err error)
+       // Close()
        WriteAt(p []byte, off int64) (n int, err error)
        WriteSectionTo(w io.Writer, off, n int64) (written int64, err error)
 }
 
+// Represents data storage for a Torrent. Additional optional interfaces to
+// implement are io.Closer, io.ReaderAt, StatefulData, and SectionOpener.
 type Data interface {
        StatelessData
+}
+
+// Data maintains per-piece persistent state.
+type StatefulData interface {
+       Data
        // We believe the piece data will pass a hash check.
        PieceCompleted(index int) error
        // Returns true if the piece is complete.
@@ -93,6 +105,79 @@ type torrent struct {
        pruneTimer *time.Timer
 }
 
+// A file-like handle to torrent data that implements SectionOpener. Opened
+// sections are be reused so long as Reads are contiguous.
+type handle struct {
+       rc     io.ReadCloser
+       curOff int64
+       so     SectionOpener
+       size   int64
+       t      Torrent
+}
+
+func (h *handle) Close() error {
+       if h.rc != nil {
+               return h.rc.Close()
+       }
+       return nil
+}
+
+func (h *handle) Read(b []byte) (n int, err error) {
+       max := h.t.prepareRead(h.curOff)
+       if int64(len(b)) > max {
+               b = b[:max]
+       }
+       if h.rc == nil {
+               h.rc, err = h.so.OpenSection(h.curOff, h.size-h.curOff)
+               if err != nil {
+                       return
+               }
+       }
+       n, err = h.rc.Read(b)
+       h.curOff += int64(n)
+       return
+}
+
+func (h *handle) Seek(off int64, whence int) (newOff int64, err error) {
+       switch whence {
+       case 0:
+               newOff = off
+       case 1:
+               newOff += off
+       case 2:
+               newOff = h.size + off
+       default:
+               err = errors.New("bad whence")
+       }
+       if newOff == h.curOff {
+               return
+       }
+       h.curOff = newOff
+       if h.rc != nil {
+               h.Close()
+               h.rc = nil
+       }
+       return
+}
+
+// Implements Handle on top of an io.SectionReader.
+type sectionReaderHandle struct {
+       *io.SectionReader
+}
+
+func (sectionReaderHandle) Close() error { return nil }
+
+func (T Torrent) NewReadHandle() Handle {
+       if so, ok := T.data.(SectionOpener); ok {
+               return &handle{
+                       so:   so,
+                       size: T.Length(),
+                       t:    T,
+               }
+       }
+       return sectionReaderHandle{io.NewSectionReader(T, 0, T.Length())}
+}
+
 func (t *torrent) numConnsUnchoked() (num int) {
        for _, c := range t.Conns {
                if !c.PeerChoked {
@@ -215,12 +300,14 @@ func (t *torrent) setMetadata(md metainfo.Info, infoBytes []byte, eventLocker sy
 }
 
 func (t *torrent) setStorage(td Data) (err error) {
-       if t.data != nil {
-               t.data.Close()
+       if c, ok := t.data.(io.Closer); ok {
+               c.Close()
        }
        t.data = td
-       for i, p := range t.Pieces {
-               p.complete = t.data.PieceComplete(i)
+       if sd, ok := t.data.(StatefulData); ok {
+               for i, p := range t.Pieces {
+                       p.complete = sd.PieceComplete(i)
+               }
        }
        return
 }
@@ -490,8 +577,8 @@ func (t *torrent) close() (err error) {
        }
        t.ceaseNetworking()
        close(t.closing)
-       if t.data != nil {
-               t.data.Close()
+       if c, ok := t.data.(io.Closer); ok {
+               c.Close()
        }
        for _, conn := range t.Conns {
                conn.Close()