err = io.EOF
return
}
- piece := t.Pieces[index]
pieceOff := pp.Integer(off % int64(t.usualPieceSize()))
pieceLeft := int(t.PieceLength(pp.Integer(index)) - pieceOff)
if pieceLeft <= 0 {
err = io.EOF
return
}
- cl.readRaisePiecePriorities(t, off, int64(len(p)))
if len(p) > pieceLeft {
p = p[:pieceLeft]
}
if len(p) == 0 {
panic(len(p))
}
+ cl.prepareRead(t, off)
+ return dataReadAt(t.data, p, off)
+}
+
+// Sets priorities to download from the given offset. Returns when the piece
+// at the given offset can be read. Returns the number of bytes that
+// immediately available from the offset.
+func (cl *Client) prepareRead(t *torrent, off int64) (n int64) {
+ index := int(off / int64(t.usualPieceSize()))
+ // Reading outside the bounds of a file is an error.
+ if index < 0 || index >= t.numPieces() {
+ return
+ }
+ piece := t.Pieces[index]
+ cl.readRaisePiecePriorities(t, off)
for !piece.Complete() && !t.isClosed() {
piece.Event.Wait()
}
- return t.data.ReadAt(p, off)
+ return t.Info.Piece(index).Length() - off%t.Info.PieceLength
}
-func (cl *Client) readRaisePiecePriorities(t *torrent, off, _len int64) {
+func (T Torrent) prepareRead(off int64) (avail int64) {
+ T.cl.mu.Lock()
+ defer T.cl.mu.Unlock()
+ return T.cl.prepareRead(T.torrent, off)
+}
+
+// Data implements a streaming interface that's more efficient than ReadAt.
+type SectionOpener interface {
+ OpenSection(off, n int64) (io.ReadCloser, error)
+}
+
+func dataReadAt(d Data, b []byte, off int64) (n int, err error) {
+ if ra, ok := d.(io.ReaderAt); ok {
+ return ra.ReadAt(b, off)
+ }
+ if so, ok := d.(SectionOpener); ok {
+ var rc io.ReadCloser
+ rc, err = so.OpenSection(off, int64(len(b)))
+ if err != nil {
+ return
+ }
+ defer rc.Close()
+ return io.ReadFull(rc, b)
+ }
+ panic(fmt.Sprintf("can't read from %T", d))
+}
+
+func (cl *Client) readRaisePiecePriorities(t *torrent, off int64) {
index := int(off / int64(t.usualPieceSize()))
cl.raisePiecePriority(t, index, piecePriorityNow)
index++
// routine.
// c.PeerRequests[request] = struct{}{}
p := make([]byte, msg.Length)
- n, err := t.data.ReadAt(p, int64(t.PieceLength(0))*int64(msg.Index)+int64(msg.Begin))
+ n, err := dataReadAt(t.data, p, int64(t.PieceLength(0))*int64(msg.Index)+int64(msg.Begin))
if err != nil {
return fmt.Errorf("reading t data to serve request %q: %s", request, err)
}
type TorrentDataOpener func(*metainfo.Info) (StatelessData, error)
-type statelessDataWrapper struct {
- StatelessData
-}
-
-func (statelessDataWrapper) PieceComplete(int) bool { return false }
-func (statelessDataWrapper) PieceCompleted(int) error { return nil }
-
-var _ Data = statelessDataWrapper{}
-
func (cl *Client) setMetaData(t *torrent, md metainfo.Info, bytes []byte) (err error) {
err = t.setMetadata(md, bytes, &cl.mu)
if err != nil {
return
}
close(t.gotMetainfo)
- stateless, err := cl.torrentDataOpener(&md)
+ td, err := cl.torrentDataOpener(&md)
if err != nil {
return
}
- td, ok := stateless.(Data)
- if !ok {
- td = statelessDataWrapper{stateless}
- }
err = cl.setStorage(t, td)
return
}
return f.path
}
+type Handle interface {
+ io.Reader
+ io.Seeker
+ io.Closer
+}
+
+// Implements a Handle within a subsection of another Handle.
+type sectionHandle struct {
+ h Handle
+ off, n, cur int64
+}
+
+func (me *sectionHandle) Seek(offset int64, whence int) (ret int64, err error) {
+ if whence == 0 {
+ offset += me.off
+ } else if whence == 2 {
+ whence = 0
+ offset = me.off + me.n
+ }
+ ret, err = me.h.Seek(offset, whence)
+ ret -= me.off
+ return
+}
+
+func (me *sectionHandle) Close() error {
+ return me.h.Close()
+}
+
+func (me *sectionHandle) Read(b []byte) (n int, err error) {
+ max := me.off + me.n - me.cur
+ if int64(len(b)) > max {
+ b = b[:max]
+ }
+ n, err = me.h.Read(b)
+ me.cur += int64(n)
+ if err != nil {
+ return
+ }
+ if me.cur == me.off+me.n {
+ err = io.EOF
+ }
+ return
+}
+
+func (f File) Open() (h Handle, err error) {
+ h = f.t.NewReadHandle()
+ _, err = h.Seek(f.offset, os.SEEK_SET)
+ if err != nil {
+ h.Close()
+ return
+ }
+ h = §ionHandle{h, f.offset, f.Length(), f.offset}
+ return
+}
+
func (f File) ReadAt(p []byte, off int64) (n int, err error) {
maxLen := f.length - off
if int64(len(p)) > maxLen {
// Returns handles to the files in the torrent. This requires the metainfo is
// available first.
func (t Torrent) Files() (ret []File) {
- select {
- case <-t.GotMetainfo:
- default:
+ t.cl.mu.Lock()
+ info := t.Info
+ t.cl.mu.Unlock()
+ if info == nil {
return
}
var offset int64
- for _, fi := range t.Info.UpvertedFiles() {
+ for _, fi := range info.UpvertedFiles() {
ret = append(ret, File{
t,
- strings.Join(append([]string{t.Info.Name}, fi.Path...), "/"),
+ strings.Join(append([]string{info.Name}, fi.Path...), "/"),
offset,
fi.Length,
fi,
}
p.EverHashed = true
if correct {
- err := t.data.PieceCompleted(int(piece))
- if err != nil {
- log.Printf("error completing piece: %s", err)
- correct = false
+ if sd, ok := t.data.(StatefulData); ok {
+ err := sd.PieceCompleted(int(piece))
+ if err != nil {
+ log.Printf("error completing piece: %s", err)
+ correct = false
+ }
}
}
if correct {
import (
"container/heap"
+ "errors"
"fmt"
"io"
"log"
func (t *torrent) PieceNumPendingBytes(index pp.Integer) (count pp.Integer) {
piece := t.Pieces[index]
+ if piece.complete {
+ return 0
+ }
if !piece.EverHashed {
return t.PieceLength(index)
}
}
type StatelessData interface {
- ReadAt(p []byte, off int64) (n int, err error)
- Close()
+ // OpenSection(off, n int64) (io.ReadCloser, error)
+ // ReadAt(p []byte, off int64) (n int, err error)
+ // Close()
WriteAt(p []byte, off int64) (n int, err error)
WriteSectionTo(w io.Writer, off, n int64) (written int64, err error)
}
+// Represents data storage for a Torrent. Additional optional interfaces to
+// implement are io.Closer, io.ReaderAt, StatefulData, and SectionOpener.
type Data interface {
StatelessData
+}
+
+// Data maintains per-piece persistent state.
+type StatefulData interface {
+ Data
// We believe the piece data will pass a hash check.
PieceCompleted(index int) error
// Returns true if the piece is complete.
pruneTimer *time.Timer
}
+// A file-like handle to torrent data that implements SectionOpener. Opened
+// sections are be reused so long as Reads are contiguous.
+type handle struct {
+ rc io.ReadCloser
+ curOff int64
+ so SectionOpener
+ size int64
+ t Torrent
+}
+
+func (h *handle) Close() error {
+ if h.rc != nil {
+ return h.rc.Close()
+ }
+ return nil
+}
+
+func (h *handle) Read(b []byte) (n int, err error) {
+ max := h.t.prepareRead(h.curOff)
+ if int64(len(b)) > max {
+ b = b[:max]
+ }
+ if h.rc == nil {
+ h.rc, err = h.so.OpenSection(h.curOff, h.size-h.curOff)
+ if err != nil {
+ return
+ }
+ }
+ n, err = h.rc.Read(b)
+ h.curOff += int64(n)
+ return
+}
+
+func (h *handle) Seek(off int64, whence int) (newOff int64, err error) {
+ switch whence {
+ case 0:
+ newOff = off
+ case 1:
+ newOff += off
+ case 2:
+ newOff = h.size + off
+ default:
+ err = errors.New("bad whence")
+ }
+ if newOff == h.curOff {
+ return
+ }
+ h.curOff = newOff
+ if h.rc != nil {
+ h.Close()
+ h.rc = nil
+ }
+ return
+}
+
+// Implements Handle on top of an io.SectionReader.
+type sectionReaderHandle struct {
+ *io.SectionReader
+}
+
+func (sectionReaderHandle) Close() error { return nil }
+
+func (T Torrent) NewReadHandle() Handle {
+ if so, ok := T.data.(SectionOpener); ok {
+ return &handle{
+ so: so,
+ size: T.Length(),
+ t: T,
+ }
+ }
+ return sectionReaderHandle{io.NewSectionReader(T, 0, T.Length())}
+}
+
func (t *torrent) numConnsUnchoked() (num int) {
for _, c := range t.Conns {
if !c.PeerChoked {
}
func (t *torrent) setStorage(td Data) (err error) {
- if t.data != nil {
- t.data.Close()
+ if c, ok := t.data.(io.Closer); ok {
+ c.Close()
}
t.data = td
- for i, p := range t.Pieces {
- p.complete = t.data.PieceComplete(i)
+ if sd, ok := t.data.(StatefulData); ok {
+ for i, p := range t.Pieces {
+ p.complete = sd.PieceComplete(i)
+ }
}
return
}
}
t.ceaseNetworking()
close(t.closing)
- if t.data != nil {
- t.data.Close()
+ if c, ok := t.data.(io.Closer); ok {
+ c.Close()
}
for _, conn := range t.Conns {
conn.Close()