From: Matt Joiner Date: Sun, 1 Mar 2015 03:32:54 +0000 (+1100) Subject: Support opening handles to Torrent and File X-Git-Tag: v1.0.0~1305 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=4eb70e0673e4c09df92668dda794f03df3f78c2d;p=btrtrc.git Support opening handles to Torrent and File --- diff --git a/client.go b/client.go index b96f67d0..648dbd9c 100644 --- a/client.go +++ b/client.go @@ -245,27 +245,67 @@ func (cl *Client) torrentReadAt(t *torrent, off int64, p []byte) (n int, err err err = io.EOF return } - piece := t.Pieces[index] pieceOff := pp.Integer(off % int64(t.usualPieceSize())) pieceLeft := int(t.PieceLength(pp.Integer(index)) - pieceOff) if pieceLeft <= 0 { err = io.EOF return } - cl.readRaisePiecePriorities(t, off, int64(len(p))) if len(p) > pieceLeft { p = p[:pieceLeft] } if len(p) == 0 { panic(len(p)) } + cl.prepareRead(t, off) + return dataReadAt(t.data, p, off) +} + +// Sets priorities to download from the given offset. Returns when the piece +// at the given offset can be read. Returns the number of bytes that +// immediately available from the offset. +func (cl *Client) prepareRead(t *torrent, off int64) (n int64) { + index := int(off / int64(t.usualPieceSize())) + // Reading outside the bounds of a file is an error. + if index < 0 || index >= t.numPieces() { + return + } + piece := t.Pieces[index] + cl.readRaisePiecePriorities(t, off) for !piece.Complete() && !t.isClosed() { piece.Event.Wait() } - return t.data.ReadAt(p, off) + return t.Info.Piece(index).Length() - off%t.Info.PieceLength } -func (cl *Client) readRaisePiecePriorities(t *torrent, off, _len int64) { +func (T Torrent) prepareRead(off int64) (avail int64) { + T.cl.mu.Lock() + defer T.cl.mu.Unlock() + return T.cl.prepareRead(T.torrent, off) +} + +// Data implements a streaming interface that's more efficient than ReadAt. +type SectionOpener interface { + OpenSection(off, n int64) (io.ReadCloser, error) +} + +func dataReadAt(d Data, b []byte, off int64) (n int, err error) { + if ra, ok := d.(io.ReaderAt); ok { + return ra.ReadAt(b, off) + } + if so, ok := d.(SectionOpener); ok { + var rc io.ReadCloser + rc, err = so.OpenSection(off, int64(len(b))) + if err != nil { + return + } + defer rc.Close() + return io.ReadFull(rc, b) + } + panic(fmt.Sprintf("can't read from %T", d)) +} + +func (cl *Client) readRaisePiecePriorities(t *torrent, off int64) { index := int(off / int64(t.usualPieceSize())) cl.raisePiecePriority(t, index, piecePriorityNow) index++ @@ -1185,7 +1225,7 @@ func (me *Client) connectionLoop(t *torrent, c *connection) error { // routine. // c.PeerRequests[request] = struct{}{} p := make([]byte, msg.Length) - n, err := t.data.ReadAt(p, int64(t.PieceLength(0))*int64(msg.Index)+int64(msg.Begin)) + n, err := dataReadAt(t.data, p, int64(t.PieceLength(0))*int64(msg.Index)+int64(msg.Begin)) if err != nil { return fmt.Errorf("reading t data to serve request %q: %s", request, err) } @@ -1555,15 +1595,6 @@ func (cl *Client) setStorage(t *torrent, td Data) (err error) { type TorrentDataOpener func(*metainfo.Info) (StatelessData, error) -type statelessDataWrapper struct { - StatelessData -} - -func (statelessDataWrapper) PieceComplete(int) bool { return false } -func (statelessDataWrapper) PieceCompleted(int) error { return nil } - -var _ Data = statelessDataWrapper{} - func (cl *Client) setMetaData(t *torrent, md metainfo.Info, bytes []byte) (err error) { err = t.setMetadata(md, bytes, &cl.mu) if err != nil { @@ -1580,14 +1611,10 @@ func (cl *Client) setMetaData(t *torrent, md metainfo.Info, bytes []byte) (err e return } close(t.gotMetainfo) - stateless, err := cl.torrentDataOpener(&md) + td, err := cl.torrentDataOpener(&md) if err != nil { return } - td, ok := stateless.(Data) - if !ok { - td = statelessDataWrapper{stateless} - } err = cl.setStorage(t, td) return } @@ -1695,6 +1722,61 @@ func (f File) Path() string { return f.path } +type Handle interface { + io.Reader + io.Seeker + io.Closer +} + +// Implements a Handle within a subsection of another Handle. +type sectionHandle struct { + h Handle + off, n, cur int64 +} + +func (me *sectionHandle) Seek(offset int64, whence int) (ret int64, err error) { + if whence == 0 { + offset += me.off + } else if whence == 2 { + whence = 0 + offset = me.off + me.n + } + ret, err = me.h.Seek(offset, whence) + ret -= me.off + return +} + +func (me *sectionHandle) Close() error { + return me.h.Close() +} + +func (me *sectionHandle) Read(b []byte) (n int, err error) { + max := me.off + me.n - me.cur + if int64(len(b)) > max { + b = b[:max] + } + n, err = me.h.Read(b) + me.cur += int64(n) + if err != nil { + return + } + if me.cur == me.off+me.n { + err = io.EOF + } + return +} + +func (f File) Open() (h Handle, err error) { + h = f.t.NewReadHandle() + _, err = h.Seek(f.offset, os.SEEK_SET) + if err != nil { + h.Close() + return + } + h = §ionHandle{h, f.offset, f.Length(), f.offset} + return +} + func (f File) ReadAt(p []byte, off int64) (n int, err error) { maxLen := f.length - off if int64(len(p)) > maxLen { @@ -1745,16 +1827,17 @@ func (f *File) PrioritizeRegion(off, len int64) { // Returns handles to the files in the torrent. This requires the metainfo is // available first. func (t Torrent) Files() (ret []File) { - select { - case <-t.GotMetainfo: - default: + t.cl.mu.Lock() + info := t.Info + t.cl.mu.Unlock() + if info == nil { return } var offset int64 - for _, fi := range t.Info.UpvertedFiles() { + for _, fi := range info.UpvertedFiles() { ret = append(ret, File{ t, - strings.Join(append([]string{t.Info.Name}, fi.Path...), "/"), + strings.Join(append([]string{info.Name}, fi.Path...), "/"), offset, fi.Length, fi, @@ -2219,10 +2302,12 @@ func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) { } p.EverHashed = true if correct { - err := t.data.PieceCompleted(int(piece)) - if err != nil { - log.Printf("error completing piece: %s", err) - correct = false + if sd, ok := t.data.(StatefulData); ok { + err := sd.PieceCompleted(int(piece)) + if err != nil { + log.Printf("error completing piece: %s", err) + correct = false + } } } if correct { diff --git a/data/blob/blob.go b/data/blob/blob.go index f8ece667..2f871d6f 100644 --- a/data/blob/blob.go +++ b/data/blob/blob.go @@ -90,10 +90,9 @@ func (me *data) WriteAt(p []byte, off int64) (n int, err error) { } func (me *data) pieceReader(piece int, off int64) (ret io.ReadCloser, err error) { - hash := me.pieceHashHex(piece) - f, err := os.Open(me.baseDir + "/complete/" + hash) + f, err := os.Open(me.completedPiecePath(piece)) if os.IsNotExist(err) { - f, err = os.Open(me.baseDir + "/incomplete/" + hash) + f, err = os.Open(me.incompletePiecePath(piece)) if os.IsNotExist(err) { err = io.EOF return diff --git a/data/file/file.go b/data/file/file.go index 5e6608de..4a22e25d 100644 --- a/data/file/file.go +++ b/data/file/file.go @@ -2,6 +2,7 @@ package file import ( "io" + "io/ioutil" "os" "path/filepath" @@ -47,8 +48,6 @@ func (me data) ReadAt(p []byte, off int64) (n int, err error) { return } -func (me data) Close() {} - func (me data) WriteAt(p []byte, off int64) (n int, err error) { for _, fi := range me.info.UpvertedFiles() { if off >= fi.Length { diff --git a/torrent.go b/torrent.go index 5217db56..890d77ac 100644 --- a/torrent.go +++ b/torrent.go @@ -2,6 +2,7 @@ package torrent import ( "container/heap" + "errors" "fmt" "io" "log" @@ -19,6 +20,9 @@ import ( func (t *torrent) PieceNumPendingBytes(index pp.Integer) (count pp.Integer) { piece := t.Pieces[index] + if piece.complete { + return 0 + } if !piece.EverHashed { return t.PieceLength(index) } @@ -39,14 +43,22 @@ type peersKey struct { } type StatelessData interface { - ReadAt(p []byte, off int64) (n int, err error) - Close() + // OpenSection(off, n int64) (io.ReadCloser, error) + // ReadAt(p []byte, off int64) (n int, err error) + // Close() WriteAt(p []byte, off int64) (n int, err error) WriteSectionTo(w io.Writer, off, n int64) (written int64, err error) } +// Represents data storage for a Torrent. Additional optional interfaces to +// implement are io.Closer, io.ReaderAt, StatefulData, and SectionOpener. type Data interface { StatelessData +} + +// Data maintains per-piece persistent state. +type StatefulData interface { + Data // We believe the piece data will pass a hash check. PieceCompleted(index int) error // Returns true if the piece is complete. @@ -93,6 +105,79 @@ type torrent struct { pruneTimer *time.Timer } +// A file-like handle to torrent data that implements SectionOpener. Opened +// sections are be reused so long as Reads are contiguous. +type handle struct { + rc io.ReadCloser + curOff int64 + so SectionOpener + size int64 + t Torrent +} + +func (h *handle) Close() error { + if h.rc != nil { + return h.rc.Close() + } + return nil +} + +func (h *handle) Read(b []byte) (n int, err error) { + max := h.t.prepareRead(h.curOff) + if int64(len(b)) > max { + b = b[:max] + } + if h.rc == nil { + h.rc, err = h.so.OpenSection(h.curOff, h.size-h.curOff) + if err != nil { + return + } + } + n, err = h.rc.Read(b) + h.curOff += int64(n) + return +} + +func (h *handle) Seek(off int64, whence int) (newOff int64, err error) { + switch whence { + case 0: + newOff = off + case 1: + newOff += off + case 2: + newOff = h.size + off + default: + err = errors.New("bad whence") + } + if newOff == h.curOff { + return + } + h.curOff = newOff + if h.rc != nil { + h.Close() + h.rc = nil + } + return +} + +// Implements Handle on top of an io.SectionReader. +type sectionReaderHandle struct { + *io.SectionReader +} + +func (sectionReaderHandle) Close() error { return nil } + +func (T Torrent) NewReadHandle() Handle { + if so, ok := T.data.(SectionOpener); ok { + return &handle{ + so: so, + size: T.Length(), + t: T, + } + } + return sectionReaderHandle{io.NewSectionReader(T, 0, T.Length())} +} + func (t *torrent) numConnsUnchoked() (num int) { for _, c := range t.Conns { if !c.PeerChoked { @@ -215,12 +300,14 @@ func (t *torrent) setMetadata(md metainfo.Info, infoBytes []byte, eventLocker sy } func (t *torrent) setStorage(td Data) (err error) { - if t.data != nil { - t.data.Close() + if c, ok := t.data.(io.Closer); ok { + c.Close() } t.data = td - for i, p := range t.Pieces { - p.complete = t.data.PieceComplete(i) + if sd, ok := t.data.(StatefulData); ok { + for i, p := range t.Pieces { + p.complete = sd.PieceComplete(i) + } } return } @@ -490,8 +577,8 @@ func (t *torrent) close() (err error) { } t.ceaseNetworking() close(t.closing) - if t.data != nil { - t.data.Close() + if c, ok := t.data.(io.Closer); ok { + c.Close() } for _, conn := range t.Conns { conn.Close()