}
}
-// Read torrent data at the given offset. Will block until it is available.
-func (cl *Client) torrentReadAt(t *torrent, off int64, p []byte) (n int, err error) {
- cl.mu.Lock()
- defer cl.mu.Unlock()
- index := int(off / int64(t.usualPieceSize()))
- // Reading outside the bounds of a file is an error.
- if index < 0 {
- err = os.ErrInvalid
- return
- }
- if int(index) >= len(t.Pieces) {
- err = io.EOF
- return
- }
- pieceOff := pp.Integer(off % int64(t.usualPieceSize()))
- pieceLeft := int(t.pieceLength(index) - pieceOff)
- if pieceLeft <= 0 {
- err = io.EOF
- return
- }
- if len(p) > pieceLeft {
- p = p[:pieceLeft]
- }
- if len(p) == 0 {
- panic(len(p))
- }
- // TODO: ReadAt should always try to fill the buffer.
- for {
- avail := cl.prepareRead(t, off)
- if avail < int64(len(p)) {
- p = p[:avail]
- }
- n, err = dataReadAt(t.data, p, off)
- if n != 0 || err != io.ErrUnexpectedEOF {
- break
- }
- // If we reach here, the data we thought was ready, isn't. So we
- // prepare it again, and retry.
- }
- return
-}
-
-// Sets priorities to download from the given offset. Returns when the piece
-// at the given offset can be read. Returns the number of bytes that are
-// immediately available from the offset.
-func (cl *Client) prepareRead(t *torrent, off int64) (n int64) {
- index := int(off / int64(t.usualPieceSize()))
- // Reading outside the bounds of a file is an error.
- if index < 0 || index >= t.numPieces() {
- return
- }
- piece := t.Pieces[index]
- cl.readRaisePiecePriorities(t, off)
- for !t.pieceComplete(index) && !t.isClosed() {
- // This is to prevent being starved if a piece is dropped before we
- // can read it.
- cl.readRaisePiecePriorities(t, off)
- piece.Event.Wait()
- }
- return t.Info.Piece(index).Length() - off%t.Info.PieceLength
-}
-
-func (T Torrent) prepareRead(off int64) (avail int64) {
- T.cl.mu.Lock()
- defer T.cl.mu.Unlock()
- return T.cl.prepareRead(T.torrent, off)
-}
-
-// Data implements a streaming interface that's more efficient than ReadAt.
+// A Data that implements this has a streaming interface that should be
+// preferred over ReadAt. For example, the data is stored in blocks on the
+// network and have a fixed cost to open.
type SectionOpener interface {
+ // Open a ReadCloser at the given offset into torrent data. n is how many
+ // bytes we intend to read.
OpenSection(off, n int64) (io.ReadCloser, error)
}
func dataReadAt(d data.Data, b []byte, off int64) (n int, err error) {
+ // defer func() {
+ // if err == io.ErrUnexpectedEOF && n != 0 {
+ // err = nil
+ // }
+ // }()
+ // log.Println("data read at", len(b), off)
again:
if ra, ok := d.(io.ReaderAt); ok {
return ra.ReadAt(b, off)
return int((readahead+pieceLength-1)/pieceLength - 1)
}
-func (cl *Client) readRaisePiecePriorities(t *torrent, off int64) {
+func (cl *Client) readRaisePiecePriorities(t *torrent, off, readaheadBytes int64) {
index := int(off / int64(t.usualPieceSize()))
cl.raisePiecePriority(t, index, piecePriorityNow)
index++
return
}
cl.raisePiecePriority(t, index, piecePriorityNext)
- for range iter.N(readaheadPieces(5*1024*1024, t.Info.PieceLength)) {
+ for range iter.N(readaheadPieces(readaheadBytes, t.Info.PieceLength)) {
index++
if index >= t.numPieces() {
break
}
}
+func (cl *Client) addUrgentRequests(t *torrent, off int64, n int) {
+ for n > 0 {
+ req, ok := t.offsetRequest(off)
+ if !ok {
+ break
+ }
+ if _, ok := t.urgent[req]; !ok && !t.haveChunk(req) {
+ if t.urgent == nil {
+ t.urgent = make(map[request]struct{}, (n+chunkSize-1)/chunkSize)
+ }
+ t.urgent[req] = struct{}{}
+ cl.event.Broadcast() // Why?
+ index := int(req.Index)
+ cl.queueFirstHash(t, index)
+ cl.pieceChanged(t, index)
+ }
+ reqOff := t.requestOffset(req)
+ n1 := req.Length - pp.Integer(off-reqOff)
+ off += int64(n1)
+ n -= int(n1)
+ }
+ // log.Print(t.urgent)
+}
+
func (cl *Client) configDir() string {
if cl._configDir == "" {
return filepath.Join(os.Getenv("HOME"), ".config/torrent")
dhtCfg.Conn = cl.utpSock.PacketConn()
}
cl.dHT, err = dht.NewServer(dhtCfg)
- if cl.ipBlockList != nil {
- cl.dHT.SetIPBlockList(cl.ipBlockList)
- }
if err != nil {
return
}
+ if cl.ipBlockList != nil {
+ cl.dHT.SetIPBlockList(cl.ipBlockList)
+ }
}
return
if err != nil {
return
}
+ for index := range iter.N(t.numPieces()) {
+ cl.pieceChanged(t, index)
+ }
cl.startTorrent(t)
return
}
t.Trackers = newTrackers
}
-// A handle to a live torrent within a Client.
-type Torrent struct {
- cl *Client
- *torrent
-}
-
// Don't call this before the info is available.
func (t *torrent) BytesCompleted() int64 {
if !t.haveInfo() {
t.cl.mu.Unlock()
}
-// Provides access to regions of torrent data that correspond to its files.
-type File struct {
- t Torrent
- path string
- offset int64
- length int64
- fi metainfo.FileInfo
-}
-
-func (f File) FileInfo() metainfo.FileInfo {
- return f.fi
-}
-
-func (f File) Path() string {
- return f.path
-}
-
// A file-like handle to some torrent data resource.
type Handle interface {
io.Reader
io.ReaderAt
}
-// Implements a Handle within a subsection of another Handle.
-type sectionHandle struct {
- h Handle
- off, n, cur int64
-}
-
-func (me *sectionHandle) Seek(offset int64, whence int) (ret int64, err error) {
- if whence == 0 {
- offset += me.off
- } else if whence == 2 {
- whence = 0
- offset += me.off + me.n
- }
- ret, err = me.h.Seek(offset, whence)
- me.cur = ret
- ret -= me.off
- return
-}
-
-func (me *sectionHandle) Close() error {
- return me.h.Close()
-}
-
-func (me *sectionHandle) Read(b []byte) (n int, err error) {
- max := me.off + me.n - me.cur
- if int64(len(b)) > max {
- b = b[:max]
- }
- n, err = me.h.Read(b)
- me.cur += int64(n)
- if err != nil {
- return
- }
- if me.cur == me.off+me.n {
- err = io.EOF
- }
- return
-}
-
-func (me *sectionHandle) ReadAt(b []byte, off int64) (n int, err error) {
- if off >= me.n {
- err = io.EOF
- return
- }
- if int64(len(b)) >= me.n-off {
- b = b[:me.n-off]
- }
- return me.h.ReadAt(b, me.off+off)
-}
-
-func (f File) Open() (h Handle, err error) {
- h = f.t.NewReadHandle()
- _, err = h.Seek(f.offset, os.SEEK_SET)
- if err != nil {
- h.Close()
- return
- }
- h = §ionHandle{h, f.offset, f.Length(), f.offset}
- return
-}
-
-func (f File) ReadAt(p []byte, off int64) (n int, err error) {
- maxLen := f.length - off
- if int64(len(p)) > maxLen {
- p = p[:maxLen]
- }
- return f.t.ReadAt(p, off+f.offset)
-}
-
-func (f *File) Length() int64 {
- return f.length
-}
-
-type FilePieceState struct {
- Length int64
- State byte
-}
-
-func (f *File) Progress() (ret []FilePieceState) {
- pieceSize := int64(f.t.usualPieceSize())
- off := f.offset % pieceSize
- remaining := f.length
- for i := int(f.offset / pieceSize); ; i++ {
- if remaining == 0 {
- break
- }
- len1 := pieceSize - off
- if len1 > remaining {
- len1 = remaining
- }
- ret = append(ret, FilePieceState{len1, f.t.pieceStatusChar(i)})
- off = 0
- remaining -= len1
- }
- return
-}
-
-func (f *File) PrioritizeRegion(off, len int64) {
- if off < 0 || off >= f.length {
- return
- }
- if off+len > f.length {
- len = f.length - off
- }
- off += f.offset
- f.t.SetRegionPriority(off, len)
-}
-
// Returns handles to the files in the torrent. This requires the metainfo is
// available first.
func (t Torrent) Files() (ret []File) {
t.cl.raisePiecePriority(t.torrent, t.numPieces()-1, piecePriorityReadahead)
}
-func (me Torrent) ReadAt(p []byte, off int64) (n int, err error) {
- return me.cl.torrentReadAt(me.torrent, off, p)
-}
-
// Returns nil metainfo if it isn't in the cache. Checks that the retrieved
// metainfo has the correct infohash.
func (cl *Client) torrentCacheMetaInfo(ih InfoHash) (mi *metainfo.MetaInfo, err error) {
}
}
addRequest := func(req request) (again bool) {
+ // TODO: Couldn't this check also be done *after* the request?
if len(c.Requests) >= 64 {
return false
}
return c.Request(req)
}
+ for req := range t.urgent {
+ if !addRequest(req) {
+ return
+ }
+ }
for e := c.pieceRequestOrder.First(); e != nil; e = e.Next() {
pieceIndex := e.Piece()
if !c.PeerHasPiece(pieceIndex) {
piece := t.Pieces[req.Index]
// Do we actually want this chunk?
- if _, ok := piece.PendingChunkSpecs[req.chunkSpec]; !ok || piece.Priority == piecePriorityNone {
+ if !t.wantChunk(req) {
unusedDownloadedChunksCount.Add(1)
c.UnwantedChunksReceived++
return nil
return fmt.Errorf("error writing chunk: %s", err)
}
+ // log.Println("got chunk", req)
+ piece.Event.Broadcast()
// Record that we have the chunk.
delete(piece.PendingChunkSpecs, req.chunkSpec)
+ delete(t.urgent, req)
if len(piece.PendingChunkSpecs) == 0 {
for _, c := range t.Conns {
c.pieceRequestOrder.DeletePiece(int(req.Index))
me.pieceChanged(t, int(piece))
}
+// TODO: Check this isn't called more than once for each piece being correct.
func (me *Client) pieceChanged(t *torrent, piece int) {
correct := t.pieceComplete(piece)
p := t.Pieces[piece]
if correct {
p.Priority = piecePriorityNone
p.PendingChunkSpecs = nil
+ for req := range t.urgent {
+ if int(req.Index) == piece {
+ delete(t.urgent, req)
+ }
+ }
p.Event.Broadcast()
} else {
if len(p.PendingChunkSpecs) == 0 {
t.pendAllChunkSpecs(int(piece))
}
- if p.Priority != piecePriorityNone {
+ if t.wantPiece(piece) {
me.openNewConns(t)
}
}