]> Sergey Matveev's repositories - btrtrc.git/blobdiff - client.go
Rework the Torrent Reader interface, to allow reader options, and add "responsive...
[btrtrc.git] / client.go
index 9b60a3a82c789f95643eee1b9286c54fe141797b..46d327919823778976bdca4c9d3aa7e0b069555c 100644 (file)
--- a/client.go
+++ b/client.go
@@ -257,80 +257,22 @@ func (cl *Client) WriteStatus(_w io.Writer) {
        }
 }
 
-// Read torrent data at the given offset. Will block until it is available.
-func (cl *Client) torrentReadAt(t *torrent, off int64, p []byte) (n int, err error) {
-       cl.mu.Lock()
-       defer cl.mu.Unlock()
-       index := int(off / int64(t.usualPieceSize()))
-       // Reading outside the bounds of a file is an error.
-       if index < 0 {
-               err = os.ErrInvalid
-               return
-       }
-       if int(index) >= len(t.Pieces) {
-               err = io.EOF
-               return
-       }
-       pieceOff := pp.Integer(off % int64(t.usualPieceSize()))
-       pieceLeft := int(t.pieceLength(index) - pieceOff)
-       if pieceLeft <= 0 {
-               err = io.EOF
-               return
-       }
-       if len(p) > pieceLeft {
-               p = p[:pieceLeft]
-       }
-       if len(p) == 0 {
-               panic(len(p))
-       }
-       // TODO: ReadAt should always try to fill the buffer.
-       for {
-               avail := cl.prepareRead(t, off)
-               if avail < int64(len(p)) {
-                       p = p[:avail]
-               }
-               n, err = dataReadAt(t.data, p, off)
-               if n != 0 || err != io.ErrUnexpectedEOF {
-                       break
-               }
-               // If we reach here, the data we thought was ready, isn't. So we
-               // prepare it again, and retry.
-       }
-       return
-}
-
-// Sets priorities to download from the given offset. Returns when the piece
-// at the given offset can be read. Returns the number of bytes that are
-// immediately available from the offset.
-func (cl *Client) prepareRead(t *torrent, off int64) (n int64) {
-       index := int(off / int64(t.usualPieceSize()))
-       // Reading outside the bounds of a file is an error.
-       if index < 0 || index >= t.numPieces() {
-               return
-       }
-       piece := t.Pieces[index]
-       cl.readRaisePiecePriorities(t, off)
-       for !t.pieceComplete(index) && !t.isClosed() {
-               // This is to prevent being starved if a piece is dropped before we
-               // can read it.
-               cl.readRaisePiecePriorities(t, off)
-               piece.Event.Wait()
-       }
-       return t.Info.Piece(index).Length() - off%t.Info.PieceLength
-}
-
-func (T Torrent) prepareRead(off int64) (avail int64) {
-       T.cl.mu.Lock()
-       defer T.cl.mu.Unlock()
-       return T.cl.prepareRead(T.torrent, off)
-}
-
-// Data implements a streaming interface that's more efficient than ReadAt.
+// A Data that implements this has a streaming interface that should be
+// preferred over ReadAt. For example, the data is stored in blocks on the
+// network and have a fixed cost to open.
 type SectionOpener interface {
+       // Open a ReadCloser at the given offset into torrent data. n is how many
+       // bytes we intend to read.
        OpenSection(off, n int64) (io.ReadCloser, error)
 }
 
 func dataReadAt(d data.Data, b []byte, off int64) (n int, err error) {
+       // defer func() {
+       //      if err == io.ErrUnexpectedEOF && n != 0 {
+       //              err = nil
+       //      }
+       // }()
+       // log.Println("data read at", len(b), off)
 again:
        if ra, ok := d.(io.ReaderAt); ok {
                return ra.ReadAt(b, off)
@@ -357,7 +299,7 @@ func readaheadPieces(readahead, pieceLength int64) int {
        return int((readahead+pieceLength-1)/pieceLength - 1)
 }
 
-func (cl *Client) readRaisePiecePriorities(t *torrent, off int64) {
+func (cl *Client) readRaisePiecePriorities(t *torrent, off, readaheadBytes int64) {
        index := int(off / int64(t.usualPieceSize()))
        cl.raisePiecePriority(t, index, piecePriorityNow)
        index++
@@ -365,7 +307,7 @@ func (cl *Client) readRaisePiecePriorities(t *torrent, off int64) {
                return
        }
        cl.raisePiecePriority(t, index, piecePriorityNext)
-       for range iter.N(readaheadPieces(5*1024*1024, t.Info.PieceLength)) {
+       for range iter.N(readaheadPieces(readaheadBytes, t.Info.PieceLength)) {
                index++
                if index >= t.numPieces() {
                        break
@@ -374,6 +316,30 @@ func (cl *Client) readRaisePiecePriorities(t *torrent, off int64) {
        }
 }
 
+func (cl *Client) addUrgentRequests(t *torrent, off int64, n int) {
+       for n > 0 {
+               req, ok := t.offsetRequest(off)
+               if !ok {
+                       break
+               }
+               if _, ok := t.urgent[req]; !ok && !t.haveChunk(req) {
+                       if t.urgent == nil {
+                               t.urgent = make(map[request]struct{}, (n+chunkSize-1)/chunkSize)
+                       }
+                       t.urgent[req] = struct{}{}
+                       cl.event.Broadcast() // Why?
+                       index := int(req.Index)
+                       cl.queueFirstHash(t, index)
+                       cl.pieceChanged(t, index)
+               }
+               reqOff := t.requestOffset(req)
+               n1 := req.Length - pp.Integer(off-reqOff)
+               off += int64(n1)
+               n -= int(n1)
+       }
+       // log.Print(t.urgent)
+}
+
 func (cl *Client) configDir() string {
        if cl._configDir == "" {
                return filepath.Join(os.Getenv("HOME"), ".config/torrent")
@@ -582,12 +548,12 @@ func NewClient(cfg *Config) (cl *Client, err error) {
                        dhtCfg.Conn = cl.utpSock.PacketConn()
                }
                cl.dHT, err = dht.NewServer(dhtCfg)
-               if cl.ipBlockList != nil {
-                       cl.dHT.SetIPBlockList(cl.ipBlockList)
-               }
                if err != nil {
                        return
                }
+               if cl.ipBlockList != nil {
+                       cl.dHT.SetIPBlockList(cl.ipBlockList)
+               }
        }
 
        return
@@ -1894,6 +1860,9 @@ func (cl *Client) setStorage(t *torrent, td data.Data) (err error) {
        if err != nil {
                return
        }
+       for index := range iter.N(t.numPieces()) {
+               cl.pieceChanged(t, index)
+       }
        cl.startTorrent(t)
        return
 }
@@ -1990,12 +1959,6 @@ func (t *torrent) addTrackers(announceList [][]string) {
        t.Trackers = newTrackers
 }
 
-// A handle to a live torrent within a Client.
-type Torrent struct {
-       cl *Client
-       *torrent
-}
-
 // Don't call this before the info is available.
 func (t *torrent) BytesCompleted() int64 {
        if !t.haveInfo() {
@@ -2014,23 +1977,6 @@ func (t Torrent) Drop() {
        t.cl.mu.Unlock()
 }
 
-// Provides access to regions of torrent data that correspond to its files.
-type File struct {
-       t      Torrent
-       path   string
-       offset int64
-       length int64
-       fi     metainfo.FileInfo
-}
-
-func (f File) FileInfo() metainfo.FileInfo {
-       return f.fi
-}
-
-func (f File) Path() string {
-       return f.path
-}
-
 // A file-like handle to some torrent data resource.
 type Handle interface {
        io.Reader
@@ -2039,114 +1985,6 @@ type Handle interface {
        io.ReaderAt
 }
 
-// Implements a Handle within a subsection of another Handle.
-type sectionHandle struct {
-       h           Handle
-       off, n, cur int64
-}
-
-func (me *sectionHandle) Seek(offset int64, whence int) (ret int64, err error) {
-       if whence == 0 {
-               offset += me.off
-       } else if whence == 2 {
-               whence = 0
-               offset += me.off + me.n
-       }
-       ret, err = me.h.Seek(offset, whence)
-       me.cur = ret
-       ret -= me.off
-       return
-}
-
-func (me *sectionHandle) Close() error {
-       return me.h.Close()
-}
-
-func (me *sectionHandle) Read(b []byte) (n int, err error) {
-       max := me.off + me.n - me.cur
-       if int64(len(b)) > max {
-               b = b[:max]
-       }
-       n, err = me.h.Read(b)
-       me.cur += int64(n)
-       if err != nil {
-               return
-       }
-       if me.cur == me.off+me.n {
-               err = io.EOF
-       }
-       return
-}
-
-func (me *sectionHandle) ReadAt(b []byte, off int64) (n int, err error) {
-       if off >= me.n {
-               err = io.EOF
-               return
-       }
-       if int64(len(b)) >= me.n-off {
-               b = b[:me.n-off]
-       }
-       return me.h.ReadAt(b, me.off+off)
-}
-
-func (f File) Open() (h Handle, err error) {
-       h = f.t.NewReadHandle()
-       _, err = h.Seek(f.offset, os.SEEK_SET)
-       if err != nil {
-               h.Close()
-               return
-       }
-       h = &sectionHandle{h, f.offset, f.Length(), f.offset}
-       return
-}
-
-func (f File) ReadAt(p []byte, off int64) (n int, err error) {
-       maxLen := f.length - off
-       if int64(len(p)) > maxLen {
-               p = p[:maxLen]
-       }
-       return f.t.ReadAt(p, off+f.offset)
-}
-
-func (f *File) Length() int64 {
-       return f.length
-}
-
-type FilePieceState struct {
-       Length int64
-       State  byte
-}
-
-func (f *File) Progress() (ret []FilePieceState) {
-       pieceSize := int64(f.t.usualPieceSize())
-       off := f.offset % pieceSize
-       remaining := f.length
-       for i := int(f.offset / pieceSize); ; i++ {
-               if remaining == 0 {
-                       break
-               }
-               len1 := pieceSize - off
-               if len1 > remaining {
-                       len1 = remaining
-               }
-               ret = append(ret, FilePieceState{len1, f.t.pieceStatusChar(i)})
-               off = 0
-               remaining -= len1
-       }
-       return
-}
-
-func (f *File) PrioritizeRegion(off, len int64) {
-       if off < 0 || off >= f.length {
-               return
-       }
-       if off+len > f.length {
-               len = f.length - off
-       }
-       off += f.offset
-       f.t.SetRegionPriority(off, len)
-}
-
 // Returns handles to the files in the torrent. This requires the metainfo is
 // available first.
 func (t Torrent) Files() (ret []File) {
@@ -2200,10 +2038,6 @@ func (t Torrent) DownloadAll() {
        t.cl.raisePiecePriority(t.torrent, t.numPieces()-1, piecePriorityReadahead)
 }
 
-func (me Torrent) ReadAt(p []byte, off int64) (n int, err error) {
-       return me.cl.torrentReadAt(me.torrent, off, p)
-}
-
 // Returns nil metainfo if it isn't in the cache. Checks that the retrieved
 // metainfo has the correct infohash.
 func (cl *Client) torrentCacheMetaInfo(ih InfoHash) (mi *metainfo.MetaInfo, err error) {
@@ -2612,11 +2446,17 @@ func (me *Client) fillRequests(t *torrent, c *connection) {
                }
        }
        addRequest := func(req request) (again bool) {
+               // TODO: Couldn't this check also be done *after* the request?
                if len(c.Requests) >= 64 {
                        return false
                }
                return c.Request(req)
        }
+       for req := range t.urgent {
+               if !addRequest(req) {
+                       return
+               }
+       }
        for e := c.pieceRequestOrder.First(); e != nil; e = e.Next() {
                pieceIndex := e.Piece()
                if !c.PeerHasPiece(pieceIndex) {
@@ -2664,7 +2504,7 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
        piece := t.Pieces[req.Index]
 
        // Do we actually want this chunk?
-       if _, ok := piece.PendingChunkSpecs[req.chunkSpec]; !ok || piece.Priority == piecePriorityNone {
+       if !t.wantChunk(req) {
                unusedDownloadedChunksCount.Add(1)
                c.UnwantedChunksReceived++
                return nil
@@ -2679,8 +2519,11 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
                return fmt.Errorf("error writing chunk: %s", err)
        }
 
+       // log.Println("got chunk", req)
+       piece.Event.Broadcast()
        // Record that we have the chunk.
        delete(piece.PendingChunkSpecs, req.chunkSpec)
+       delete(t.urgent, req)
        if len(piece.PendingChunkSpecs) == 0 {
                for _, c := range t.Conns {
                        c.pieceRequestOrder.DeletePiece(int(req.Index))
@@ -2717,18 +2560,24 @@ func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) {
        me.pieceChanged(t, int(piece))
 }
 
+// TODO: Check this isn't called more than once for each piece being correct.
 func (me *Client) pieceChanged(t *torrent, piece int) {
        correct := t.pieceComplete(piece)
        p := t.Pieces[piece]
        if correct {
                p.Priority = piecePriorityNone
                p.PendingChunkSpecs = nil
+               for req := range t.urgent {
+                       if int(req.Index) == piece {
+                               delete(t.urgent, req)
+                       }
+               }
                p.Event.Broadcast()
        } else {
                if len(p.PendingChunkSpecs) == 0 {
                        t.pendAllChunkSpecs(int(piece))
                }
-               if p.Priority != piecePriorityNone {
+               if t.wantPiece(piece) {
                        me.openNewConns(t)
                }
        }