]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Rework the Torrent Reader interface, to allow reader options, and add "responsive...
authorMatt Joiner <anacrolix@gmail.com>
Tue, 14 Apr 2015 13:59:41 +0000 (23:59 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Tue, 14 Apr 2015 13:59:41 +0000 (23:59 +1000)
Had several weeks of testing. Removes a lot of the "helper" reading methods, but this was necessary to allow per-Torrent reading options.

TODO
client.go
client_test.go
connection.go
data/blob/blob.go
file.go [new file with mode: 0644]
fs/torrentfs.go
reader.go [new file with mode: 0644]
t.go [new file with mode: 0644]
torrent.go

diff --git a/TODO b/TODO
index 72aabfc623f3fa1e5acefc526e0fa012417735a4..d1b2c6bf094033635e92ffc5238b34e878b61c76 100644 (file)
--- a/TODO
+++ b/TODO
@@ -1,7 +1,10 @@
  * Track upload and download data.
- * Emulate a UDP server in the UDP tracker tests.
- * Make use of sparse file regions in download data for faster hashing.
- * If we're choked and interested, we never send not-interested if there's nothing we want?
- * Don't announce torrents that don't need active peers. It spams UDP, fills memory, and publicizes what we have loaded.
- * Randomize triedAddrs bloom filter to allow different Addr sets on each Announce.
- * When lots of good connections, it'll do a huge readahead, then refuse to trickle new pieces because we sent not interested to them all, thereby reducing the number of unchoked connections.
\ No newline at end of file
+ * Emulate a UDP server in the UDP tracker tests rather than communicating with the Internet.
+ * Make use of sparse file regions in download data for faster hashing. This is available as whence 3 and 4 on some OS?
+ * When we're choked and interested, are we not interested if there's no longer anything that we want?
+ * dht: Randomize triedAddrs bloom filter to allow different Addr sets on each Announce.
+ * dht: Verify that the triedAddrs bloom filter is working well, github's willf made a bunch of changes.
+ * Rearrange the local-peer choked/interested status flags to be more natural to read.
+ * Check that pruning is working correctly. worstConns sorting might need an adjustment to how it factors in the good/unwanted chunks ratio.
+ * data/blob: Deleting incomplete data triggers io.ErrUnexpectedEOF that isn't recovered from.
+ * Responsive reader needs to apply some readahead.
\ No newline at end of file
index 9b60a3a82c789f95643eee1b9286c54fe141797b..46d327919823778976bdca4c9d3aa7e0b069555c 100644 (file)
--- a/client.go
+++ b/client.go
@@ -257,80 +257,22 @@ func (cl *Client) WriteStatus(_w io.Writer) {
        }
 }
 
-// Read torrent data at the given offset. Will block until it is available.
-func (cl *Client) torrentReadAt(t *torrent, off int64, p []byte) (n int, err error) {
-       cl.mu.Lock()
-       defer cl.mu.Unlock()
-       index := int(off / int64(t.usualPieceSize()))
-       // Reading outside the bounds of a file is an error.
-       if index < 0 {
-               err = os.ErrInvalid
-               return
-       }
-       if int(index) >= len(t.Pieces) {
-               err = io.EOF
-               return
-       }
-       pieceOff := pp.Integer(off % int64(t.usualPieceSize()))
-       pieceLeft := int(t.pieceLength(index) - pieceOff)
-       if pieceLeft <= 0 {
-               err = io.EOF
-               return
-       }
-       if len(p) > pieceLeft {
-               p = p[:pieceLeft]
-       }
-       if len(p) == 0 {
-               panic(len(p))
-       }
-       // TODO: ReadAt should always try to fill the buffer.
-       for {
-               avail := cl.prepareRead(t, off)
-               if avail < int64(len(p)) {
-                       p = p[:avail]
-               }
-               n, err = dataReadAt(t.data, p, off)
-               if n != 0 || err != io.ErrUnexpectedEOF {
-                       break
-               }
-               // If we reach here, the data we thought was ready, isn't. So we
-               // prepare it again, and retry.
-       }
-       return
-}
-
-// Sets priorities to download from the given offset. Returns when the piece
-// at the given offset can be read. Returns the number of bytes that are
-// immediately available from the offset.
-func (cl *Client) prepareRead(t *torrent, off int64) (n int64) {
-       index := int(off / int64(t.usualPieceSize()))
-       // Reading outside the bounds of a file is an error.
-       if index < 0 || index >= t.numPieces() {
-               return
-       }
-       piece := t.Pieces[index]
-       cl.readRaisePiecePriorities(t, off)
-       for !t.pieceComplete(index) && !t.isClosed() {
-               // This is to prevent being starved if a piece is dropped before we
-               // can read it.
-               cl.readRaisePiecePriorities(t, off)
-               piece.Event.Wait()
-       }
-       return t.Info.Piece(index).Length() - off%t.Info.PieceLength
-}
-
-func (T Torrent) prepareRead(off int64) (avail int64) {
-       T.cl.mu.Lock()
-       defer T.cl.mu.Unlock()
-       return T.cl.prepareRead(T.torrent, off)
-}
-
-// Data implements a streaming interface that's more efficient than ReadAt.
+// A Data that implements this has a streaming interface that should be
+// preferred over ReadAt. For example, the data is stored in blocks on the
+// network and have a fixed cost to open.
 type SectionOpener interface {
+       // Open a ReadCloser at the given offset into torrent data. n is how many
+       // bytes we intend to read.
        OpenSection(off, n int64) (io.ReadCloser, error)
 }
 
 func dataReadAt(d data.Data, b []byte, off int64) (n int, err error) {
+       // defer func() {
+       //      if err == io.ErrUnexpectedEOF && n != 0 {
+       //              err = nil
+       //      }
+       // }()
+       // log.Println("data read at", len(b), off)
 again:
        if ra, ok := d.(io.ReaderAt); ok {
                return ra.ReadAt(b, off)
@@ -357,7 +299,7 @@ func readaheadPieces(readahead, pieceLength int64) int {
        return int((readahead+pieceLength-1)/pieceLength - 1)
 }
 
-func (cl *Client) readRaisePiecePriorities(t *torrent, off int64) {
+func (cl *Client) readRaisePiecePriorities(t *torrent, off, readaheadBytes int64) {
        index := int(off / int64(t.usualPieceSize()))
        cl.raisePiecePriority(t, index, piecePriorityNow)
        index++
@@ -365,7 +307,7 @@ func (cl *Client) readRaisePiecePriorities(t *torrent, off int64) {
                return
        }
        cl.raisePiecePriority(t, index, piecePriorityNext)
-       for range iter.N(readaheadPieces(5*1024*1024, t.Info.PieceLength)) {
+       for range iter.N(readaheadPieces(readaheadBytes, t.Info.PieceLength)) {
                index++
                if index >= t.numPieces() {
                        break
@@ -374,6 +316,30 @@ func (cl *Client) readRaisePiecePriorities(t *torrent, off int64) {
        }
 }
 
+func (cl *Client) addUrgentRequests(t *torrent, off int64, n int) {
+       for n > 0 {
+               req, ok := t.offsetRequest(off)
+               if !ok {
+                       break
+               }
+               if _, ok := t.urgent[req]; !ok && !t.haveChunk(req) {
+                       if t.urgent == nil {
+                               t.urgent = make(map[request]struct{}, (n+chunkSize-1)/chunkSize)
+                       }
+                       t.urgent[req] = struct{}{}
+                       cl.event.Broadcast() // Why?
+                       index := int(req.Index)
+                       cl.queueFirstHash(t, index)
+                       cl.pieceChanged(t, index)
+               }
+               reqOff := t.requestOffset(req)
+               n1 := req.Length - pp.Integer(off-reqOff)
+               off += int64(n1)
+               n -= int(n1)
+       }
+       // log.Print(t.urgent)
+}
+
 func (cl *Client) configDir() string {
        if cl._configDir == "" {
                return filepath.Join(os.Getenv("HOME"), ".config/torrent")
@@ -582,12 +548,12 @@ func NewClient(cfg *Config) (cl *Client, err error) {
                        dhtCfg.Conn = cl.utpSock.PacketConn()
                }
                cl.dHT, err = dht.NewServer(dhtCfg)
-               if cl.ipBlockList != nil {
-                       cl.dHT.SetIPBlockList(cl.ipBlockList)
-               }
                if err != nil {
                        return
                }
+               if cl.ipBlockList != nil {
+                       cl.dHT.SetIPBlockList(cl.ipBlockList)
+               }
        }
 
        return
@@ -1894,6 +1860,9 @@ func (cl *Client) setStorage(t *torrent, td data.Data) (err error) {
        if err != nil {
                return
        }
+       for index := range iter.N(t.numPieces()) {
+               cl.pieceChanged(t, index)
+       }
        cl.startTorrent(t)
        return
 }
@@ -1990,12 +1959,6 @@ func (t *torrent) addTrackers(announceList [][]string) {
        t.Trackers = newTrackers
 }
 
-// A handle to a live torrent within a Client.
-type Torrent struct {
-       cl *Client
-       *torrent
-}
-
 // Don't call this before the info is available.
 func (t *torrent) BytesCompleted() int64 {
        if !t.haveInfo() {
@@ -2014,23 +1977,6 @@ func (t Torrent) Drop() {
        t.cl.mu.Unlock()
 }
 
-// Provides access to regions of torrent data that correspond to its files.
-type File struct {
-       t      Torrent
-       path   string
-       offset int64
-       length int64
-       fi     metainfo.FileInfo
-}
-
-func (f File) FileInfo() metainfo.FileInfo {
-       return f.fi
-}
-
-func (f File) Path() string {
-       return f.path
-}
-
 // A file-like handle to some torrent data resource.
 type Handle interface {
        io.Reader
@@ -2039,114 +1985,6 @@ type Handle interface {
        io.ReaderAt
 }
 
-// Implements a Handle within a subsection of another Handle.
-type sectionHandle struct {
-       h           Handle
-       off, n, cur int64
-}
-
-func (me *sectionHandle) Seek(offset int64, whence int) (ret int64, err error) {
-       if whence == 0 {
-               offset += me.off
-       } else if whence == 2 {
-               whence = 0
-               offset += me.off + me.n
-       }
-       ret, err = me.h.Seek(offset, whence)
-       me.cur = ret
-       ret -= me.off
-       return
-}
-
-func (me *sectionHandle) Close() error {
-       return me.h.Close()
-}
-
-func (me *sectionHandle) Read(b []byte) (n int, err error) {
-       max := me.off + me.n - me.cur
-       if int64(len(b)) > max {
-               b = b[:max]
-       }
-       n, err = me.h.Read(b)
-       me.cur += int64(n)
-       if err != nil {
-               return
-       }
-       if me.cur == me.off+me.n {
-               err = io.EOF
-       }
-       return
-}
-
-func (me *sectionHandle) ReadAt(b []byte, off int64) (n int, err error) {
-       if off >= me.n {
-               err = io.EOF
-               return
-       }
-       if int64(len(b)) >= me.n-off {
-               b = b[:me.n-off]
-       }
-       return me.h.ReadAt(b, me.off+off)
-}
-
-func (f File) Open() (h Handle, err error) {
-       h = f.t.NewReadHandle()
-       _, err = h.Seek(f.offset, os.SEEK_SET)
-       if err != nil {
-               h.Close()
-               return
-       }
-       h = &sectionHandle{h, f.offset, f.Length(), f.offset}
-       return
-}
-
-func (f File) ReadAt(p []byte, off int64) (n int, err error) {
-       maxLen := f.length - off
-       if int64(len(p)) > maxLen {
-               p = p[:maxLen]
-       }
-       return f.t.ReadAt(p, off+f.offset)
-}
-
-func (f *File) Length() int64 {
-       return f.length
-}
-
-type FilePieceState struct {
-       Length int64
-       State  byte
-}
-
-func (f *File) Progress() (ret []FilePieceState) {
-       pieceSize := int64(f.t.usualPieceSize())
-       off := f.offset % pieceSize
-       remaining := f.length
-       for i := int(f.offset / pieceSize); ; i++ {
-               if remaining == 0 {
-                       break
-               }
-               len1 := pieceSize - off
-               if len1 > remaining {
-                       len1 = remaining
-               }
-               ret = append(ret, FilePieceState{len1, f.t.pieceStatusChar(i)})
-               off = 0
-               remaining -= len1
-       }
-       return
-}
-
-func (f *File) PrioritizeRegion(off, len int64) {
-       if off < 0 || off >= f.length {
-               return
-       }
-       if off+len > f.length {
-               len = f.length - off
-       }
-       off += f.offset
-       f.t.SetRegionPriority(off, len)
-}
-
 // Returns handles to the files in the torrent. This requires the metainfo is
 // available first.
 func (t Torrent) Files() (ret []File) {
@@ -2200,10 +2038,6 @@ func (t Torrent) DownloadAll() {
        t.cl.raisePiecePriority(t.torrent, t.numPieces()-1, piecePriorityReadahead)
 }
 
-func (me Torrent) ReadAt(p []byte, off int64) (n int, err error) {
-       return me.cl.torrentReadAt(me.torrent, off, p)
-}
-
 // Returns nil metainfo if it isn't in the cache. Checks that the retrieved
 // metainfo has the correct infohash.
 func (cl *Client) torrentCacheMetaInfo(ih InfoHash) (mi *metainfo.MetaInfo, err error) {
@@ -2612,11 +2446,17 @@ func (me *Client) fillRequests(t *torrent, c *connection) {
                }
        }
        addRequest := func(req request) (again bool) {
+               // TODO: Couldn't this check also be done *after* the request?
                if len(c.Requests) >= 64 {
                        return false
                }
                return c.Request(req)
        }
+       for req := range t.urgent {
+               if !addRequest(req) {
+                       return
+               }
+       }
        for e := c.pieceRequestOrder.First(); e != nil; e = e.Next() {
                pieceIndex := e.Piece()
                if !c.PeerHasPiece(pieceIndex) {
@@ -2664,7 +2504,7 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
        piece := t.Pieces[req.Index]
 
        // Do we actually want this chunk?
-       if _, ok := piece.PendingChunkSpecs[req.chunkSpec]; !ok || piece.Priority == piecePriorityNone {
+       if !t.wantChunk(req) {
                unusedDownloadedChunksCount.Add(1)
                c.UnwantedChunksReceived++
                return nil
@@ -2679,8 +2519,11 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
                return fmt.Errorf("error writing chunk: %s", err)
        }
 
+       // log.Println("got chunk", req)
+       piece.Event.Broadcast()
        // Record that we have the chunk.
        delete(piece.PendingChunkSpecs, req.chunkSpec)
+       delete(t.urgent, req)
        if len(piece.PendingChunkSpecs) == 0 {
                for _, c := range t.Conns {
                        c.pieceRequestOrder.DeletePiece(int(req.Index))
@@ -2717,18 +2560,24 @@ func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) {
        me.pieceChanged(t, int(piece))
 }
 
+// TODO: Check this isn't called more than once for each piece being correct.
 func (me *Client) pieceChanged(t *torrent, piece int) {
        correct := t.pieceComplete(piece)
        p := t.Pieces[piece]
        if correct {
                p.Priority = piecePriorityNone
                p.PendingChunkSpecs = nil
+               for req := range t.urgent {
+                       if int(req.Index) == piece {
+                               delete(t.urgent, req)
+                       }
+               }
                p.Event.Broadcast()
        } else {
                if len(p.PendingChunkSpecs) == 0 {
                        t.pendAllChunkSpecs(int(piece))
                }
-               if p.Priority != piecePriorityNone {
+               if t.wantPiece(piece) {
                        me.openNewConns(t)
                }
        }
index 08dbc80cd8b655647a35be4fbf58ec1e1b481ebc..05d7c90924e505c2e67ec3dfab292aa4b6a814fd 100644 (file)
@@ -3,7 +3,6 @@ package torrent
 import (
        "encoding/binary"
        "fmt"
-       "io"
        "io/ioutil"
        "log"
        "net"
@@ -273,9 +272,11 @@ func TestClientTransfer(t *testing.T) {
                        Port: util.AddrPort(seeder.ListenAddr()),
                },
        })
-       _greeting, err := ioutil.ReadAll(io.NewSectionReader(leecherGreeting, 0, leecherGreeting.Length()))
+       r := leecherGreeting.NewReader()
+       defer r.Close()
+       _greeting, err := ioutil.ReadAll(r)
        if err != nil {
-               t.Fatal(err)
+               t.Fatalf("%q %s", string(_greeting), err)
        }
        greeting := string(_greeting)
        if greeting != testutil.GreetingFileContents {
index b365f743d0e544f3834fc8dc6f727e37c6e7ec9f..25a834708a41773949bbe40312738a7078fa3422 100644 (file)
@@ -106,8 +106,13 @@ func (cn *connection) pendPiece(piece int, priority piecePriority) {
                return
        }
        pp := cn.piecePriorities[piece]
-       // Priority goes to Now, then Next in connection order. Then Readahead in
-       // by piece index. Then normal again by connection order.
+       // Priority regions not to scale. Within each region, piece is randomized
+       // according to connection.
+
+       // [ Now         ]
+       //  [ Next       ]
+       //   [ Readahead ]
+       //                [ Normal ]
        key := func() int {
                switch priority {
                case piecePriorityNow:
index a3ade8e687a6dd08547159b200af65d6eca6f84f..934bbc4826fe473f6547e0238fb4ceb813b23db8 100644 (file)
@@ -3,6 +3,7 @@ package blob
 import (
        "encoding/hex"
        "io"
+       "log"
 
        "github.com/anacrolix/libtorgo/metainfo"
 )
@@ -19,16 +20,36 @@ func (me *data) pieceHashHex(i int) string {
 func (me *data) Close() {}
 
 func (me *data) ReadAt(b []byte, off int64) (n int, err error) {
-       p := me.info.Piece(int(off / me.info.PieceLength))
-       f := me.store.pieceRead(p)
-       if f == nil {
-               err = io.ErrUnexpectedEOF
-               return
-       }
-       defer f.Close()
-       n, err = f.ReadAt(b, off%me.info.PieceLength)
-       if err == io.EOF {
-               err = io.ErrUnexpectedEOF
+       for len(b) != 0 {
+               if off >= me.info.TotalLength() {
+                       err = io.EOF
+                       break
+               }
+               p := me.info.Piece(int(off / me.info.PieceLength))
+               f := me.store.pieceRead(p)
+               if f == nil {
+                       log.Println("piece not found", p)
+                       err = io.ErrUnexpectedEOF
+                       break
+               }
+               b1 := b
+               maxN1 := int(p.Length() - off%me.info.PieceLength)
+               if len(b1) > maxN1 {
+                       b1 = b1[:maxN1]
+               }
+               var n1 int
+               n1, err = f.ReadAt(b1, off%me.info.PieceLength)
+               f.Close()
+               n += n1
+               off += int64(n1)
+               b = b[n1:]
+               if err == io.EOF {
+                       err = nil
+                       break
+               }
+               if err != nil {
+                       break
+               }
        }
        return
 }
diff --git a/file.go b/file.go
new file mode 100644 (file)
index 0000000..1ec0d21
--- /dev/null
+++ b/file.go
@@ -0,0 +1,64 @@
+package torrent
+
+import "github.com/anacrolix/libtorgo/metainfo"
+
+// Provides access to regions of torrent data that correspond to its files.
+type File struct {
+       t      Torrent
+       path   string
+       offset int64
+       length int64
+       fi     metainfo.FileInfo
+}
+
+// Data for this file begins this far into the torrent.
+func (f *File) Offset() int64 {
+       return f.offset
+}
+
+func (f File) FileInfo() metainfo.FileInfo {
+       return f.fi
+}
+
+func (f File) Path() string {
+       return f.path
+}
+
+func (f *File) Length() int64 {
+       return f.length
+}
+
+type FilePieceState struct {
+       Length int64
+       State  byte
+}
+
+func (f *File) Progress() (ret []FilePieceState) {
+       pieceSize := int64(f.t.usualPieceSize())
+       off := f.offset % pieceSize
+       remaining := f.length
+       for i := int(f.offset / pieceSize); ; i++ {
+               if remaining == 0 {
+                       break
+               }
+               len1 := pieceSize - off
+               if len1 > remaining {
+                       len1 = remaining
+               }
+               ret = append(ret, FilePieceState{len1, f.t.pieceStatusChar(i)})
+               off = 0
+               remaining -= len1
+       }
+       return
+}
+
+func (f *File) PrioritizeRegion(off, len int64) {
+       if off < 0 || off >= f.length {
+               return
+       }
+       if off+len > f.length {
+               len = f.length - off
+       }
+       off += f.offset
+       f.t.SetRegionPriority(off, len)
+}
index 47fd6109adba4207676d78d33ce138db6f56d487..ffb9df68b11e05186c0fcb4e7f78ba8d498dd2a4 100644 (file)
@@ -91,7 +91,10 @@ func blockingRead(ctx context.Context, fs *TorrentFS, t torrent.Torrent, off int
        )
        readDone := make(chan struct{})
        go func() {
-               _n, _err = t.ReadAt(p, off)
+               r := t.NewReader()
+               defer r.Close()
+               _n, _err = r.ReadAt(p, off)
+               log.Println(_n, p)
                close(readDone)
        }()
        select {
diff --git a/reader.go b/reader.go
new file mode 100644 (file)
index 0000000..60357f4
--- /dev/null
+++ b/reader.go
@@ -0,0 +1,132 @@
+package torrent
+
+import (
+       "errors"
+       "io"
+       "os"
+)
+
+// Accesses torrent data via a client.
+type Reader struct {
+       t          *Torrent
+       pos        int64
+       responsive bool
+       readahead  int64
+}
+
+var _ io.ReadCloser = &Reader{}
+
+// Don't wait for pieces to complete and be verified. Read calls return as
+// soon as they can when the underlying chunks become available.
+func (r *Reader) SetResponsive() {
+       r.responsive = true
+}
+
+func (r *Reader) SetReadahead(readahead int64) {
+       r.readahead = readahead
+}
+
+func (r *Reader) raisePriorities(off int64, n int) {
+       if r.responsive {
+               r.t.cl.addUrgentRequests(r.t.torrent, off, n)
+       }
+       r.t.cl.readRaisePiecePriorities(r.t.torrent, off, int64(n)+r.readahead)
+}
+
+func (r *Reader) readable(off int64) (ret bool) {
+       // log.Println("readable", off)
+       // defer func() {
+       //      log.Println("readable", ret)
+       // }()
+       req, ok := r.t.offsetRequest(off)
+       if !ok {
+               panic(off)
+       }
+       if r.responsive {
+               return r.t.haveChunk(req)
+       }
+       return r.t.pieceComplete(int(req.Index))
+}
+
+// How many bytes are available to read. Max is the most we could require.
+func (r *Reader) available(off, max int64) (ret int64) {
+       for max > 0 {
+               req, ok := r.t.offsetRequest(off)
+               if !ok {
+                       break
+               }
+               if !r.t.haveChunk(req) {
+                       break
+               }
+               len1 := int64(req.Length) - (off - r.t.requestOffset(req))
+               max -= len1
+               ret += len1
+               off += len1
+       }
+       return
+}
+
+func (r *Reader) waitReadable(off int64) {
+       r.t.Pieces[off/int64(r.t.usualPieceSize())].Event.Wait()
+}
+
+func (r *Reader) ReadAt(b []byte, off int64) (n int, err error) {
+       return r.readAt(b, off)
+}
+
+func (r *Reader) Read(b []byte) (n int, err error) {
+       n, err = r.readAt(b, r.pos)
+       r.pos += int64(n)
+       if n != 0 && err == io.ErrUnexpectedEOF {
+               err = nil
+       }
+       return
+}
+
+func (r *Reader) readAt(b []byte, pos int64) (n int, err error) {
+       // defer func() {
+       //      log.Println(pos, n, err)
+       // }()
+       r.t.cl.mu.Lock()
+       defer r.t.cl.mu.Unlock()
+       maxLen := r.t.Info.TotalLength() - pos
+       if maxLen <= 0 {
+               err = io.EOF
+               return
+       }
+       if int64(len(b)) > maxLen {
+               b = b[:maxLen]
+       }
+       r.raisePriorities(pos, len(b))
+       for !r.readable(pos) {
+               r.raisePriorities(pos, len(b))
+               r.waitReadable(pos)
+       }
+       avail := r.available(pos, int64(len(b)))
+       // log.Println("available", avail)
+       if int64(len(b)) > avail {
+               b = b[:avail]
+       }
+       n, err = dataReadAt(r.t.data, b, pos)
+       return
+}
+
+func (r *Reader) Close() error {
+       r.t = nil
+       return nil
+}
+
+func (r *Reader) Seek(off int64, whence int) (ret int64, err error) {
+       switch whence {
+       case os.SEEK_SET:
+               r.pos = off
+       case os.SEEK_CUR:
+               r.pos += off
+       case os.SEEK_END:
+               r.pos = r.t.Info.TotalLength() + off
+       default:
+               err = errors.New("bad whence")
+       }
+       ret = r.pos
+       return
+}
diff --git a/t.go b/t.go
new file mode 100644 (file)
index 0000000..be35c8c
--- /dev/null
+++ b/t.go
@@ -0,0 +1,17 @@
+package torrent
+
+// The public interface for a torrent within a Client.
+
+// A handle to a live torrent within a Client.
+type Torrent struct {
+       cl *Client
+       *torrent
+}
+
+func (t *Torrent) NewReader() (ret *Reader) {
+       ret = &Reader{
+               t:         t,
+               readahead: 5 * 1024 * 1024,
+       }
+       return
+}
index 8b5dbb715a7eee3bcf154833a66eb7212721b8f3..72d0b1299e9607035d9a4bdb69da3deff4ef7d20 100644 (file)
@@ -2,12 +2,10 @@ package torrent
 
 import (
        "container/heap"
-       "errors"
        "fmt"
        "io"
        "log"
        "net"
-       "os"
        "sort"
        "sync"
        "time"
@@ -66,6 +64,9 @@ type torrent struct {
 
        InfoHash InfoHash
        Pieces   []*piece
+       // Chunks that are wanted before all others. This is for
+       // responsive/streaming readers that want to unblock ASAP.
+       urgent map[request]struct{}
        // Total length of the torrent in bytes. Stored because it's not O(1) to
        // get this from the info dict.
        length int64
@@ -110,91 +111,6 @@ func (t *torrent) pieceComplete(piece int) bool {
        return t.data != nil && t.data.PieceComplete(piece)
 }
 
-// A file-like handle to torrent data that implements SectionOpener. Opened
-// sections will be reused so long as Reads and ReadAt's are contiguous.
-type handle struct {
-       rc     io.ReadCloser
-       rcOff  int64
-       curOff int64
-       so     SectionOpener
-       size   int64
-       t      Torrent
-}
-
-func (h *handle) Close() error {
-       if h.rc != nil {
-               return h.rc.Close()
-       }
-       return nil
-}
-
-func (h *handle) ReadAt(b []byte, off int64) (n int, err error) {
-       return h.readAt(b, off)
-}
-
-func (h *handle) readAt(b []byte, off int64) (n int, err error) {
-       avail := h.t.prepareRead(off)
-       if int64(len(b)) > avail {
-               b = b[:avail]
-       }
-       if int64(len(b)) > h.size-off {
-               b = b[:h.size-off]
-       }
-       if h.rcOff != off && h.rc != nil {
-               h.rc.Close()
-               h.rc = nil
-       }
-       if h.rc == nil {
-               h.rc, err = h.so.OpenSection(off, h.size-off)
-               if err != nil {
-                       return
-               }
-               h.rcOff = off
-       }
-       n, err = h.rc.Read(b)
-       h.rcOff += int64(n)
-       return
-}
-
-func (h *handle) Read(b []byte) (n int, err error) {
-       n, err = h.readAt(b, h.curOff)
-       h.curOff = h.rcOff
-       return
-}
-
-func (h *handle) Seek(off int64, whence int) (newOff int64, err error) {
-       switch whence {
-       case os.SEEK_SET:
-               h.curOff = off
-       case os.SEEK_CUR:
-               h.curOff += off
-       case os.SEEK_END:
-               h.curOff = h.size + off
-       default:
-               err = errors.New("bad whence")
-       }
-       newOff = h.curOff
-       return
-}
-
-// Implements Handle on top of an io.SectionReader.
-type sectionReaderHandle struct {
-       *io.SectionReader
-}
-
-func (sectionReaderHandle) Close() error { return nil }
-
-func (T Torrent) NewReadHandle() Handle {
-       if so, ok := T.data.(SectionOpener); ok {
-               return &handle{
-                       so:   so,
-                       size: T.Length(),
-                       t:    T,
-               }
-       }
-       return sectionReaderHandle{io.NewSectionReader(T, 0, T.Length())}
-}
-
 func (t *torrent) numConnsUnchoked() (num int) {
        for _, c := range t.Conns {
                if !c.PeerChoked {
@@ -238,7 +154,9 @@ func (t *torrent) ceaseNetworking() {
        for _, c := range t.Conns {
                c.Close()
        }
-       t.pruneTimer.Stop()
+       if t.pruneTimer != nil {
+               t.pruneTimer.Stop()
+       }
 }
 
 func (t *torrent) addPeer(p Peer) {
@@ -502,6 +420,11 @@ func (t *torrent) writeStatus(w io.Writer) {
                }
                fmt.Fprintln(w)
        }
+       fmt.Fprintf(w, "Urgent:")
+       for req := range t.urgent {
+               fmt.Fprintf(w, " %s", req)
+       }
+       fmt.Fprintln(w)
        fmt.Fprintf(w, "Trackers: ")
        for _, tier := range t.Trackers {
                for _, tr := range tier {
@@ -647,6 +570,7 @@ func (t *torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
 
 func (t *torrent) bitfield() (bf []bool) {
        for _, p := range t.Pieces {
+               // TODO: Check this logic.
                bf = append(bf, p.EverHashed && len(p.PendingChunkSpecs) == 0)
        }
        return
@@ -732,11 +656,12 @@ func (t *torrent) havePiece(index int) bool {
 }
 
 func (t *torrent) haveChunk(r request) bool {
-       p := t.Pieces[r.Index]
-       if !p.EverHashed {
+       if !t.haveInfo() {
                return false
        }
-       _, ok := p.PendingChunkSpecs[r.chunkSpec]
+       piece := t.Pieces[r.Index]
+       _, ok := piece.PendingChunkSpecs[r.chunkSpec]
+       // log.Println("have chunk", r, !ok)
        return !ok
 }
 
@@ -745,16 +670,41 @@ func (t *torrent) wantChunk(r request) bool {
                return false
        }
        _, ok := t.Pieces[r.Index].PendingChunkSpecs[r.chunkSpec]
+       if ok {
+               return true
+       }
+       _, ok = t.urgent[r]
        return ok
 }
 
+func (t *torrent) urgentChunkInPiece(piece int) bool {
+       for req := range t.urgent {
+               if int(req.Index) == piece {
+                       return true
+               }
+       }
+       return false
+}
+
 func (t *torrent) wantPiece(index int) bool {
        if !t.haveInfo() {
                return false
        }
        p := t.Pieces[index]
-       // Put piece complete check last, since it's the slowest!
-       return p.Priority != piecePriorityNone && !p.QueuedForHash && !p.Hashing && !t.pieceComplete(index)
+       if p.QueuedForHash {
+               return false
+       }
+       if p.Hashing {
+               return false
+       }
+       if p.Priority == piecePriorityNone {
+               if !t.urgentChunkInPiece(index) {
+                       return false
+               }
+       }
+       // Put piece complete check last, since it's the slowest as it can involve
+       // calling out into external data stores.
+       return !t.pieceComplete(index)
 }
 
 func (t *torrent) connHasWantedPieces(c *connection) bool {