"bitbucket.org/anacrolix/go.torrent/dht"
"bitbucket.org/anacrolix/go.torrent/util"
"bufio"
- "container/list"
"crypto/rand"
"crypto/sha1"
"errors"
if !t.haveInfo() {
return errors.New("missing metadata")
}
- newPriorities := make([]request, 0, (len_+chunkSize-1)/chunkSize)
- for len_ > 0 {
- req, ok := t.offsetRequest(off)
- if !ok {
- return errors.New("bad offset")
- }
- reqOff := t.requestOffset(req)
- // Gain the alignment adjustment.
- len_ += off - reqOff
- // Lose the length of this block.
- len_ -= int64(req.Length)
- off = reqOff + int64(req.Length)
- if !t.wantPiece(int(req.Index)) {
- continue
- }
- newPriorities = append(newPriorities, req)
- }
- if len(newPriorities) == 0 {
- return nil
- }
- t.Priorities.PushFront(newPriorities[0])
- for _, req := range newPriorities[1:] {
- t.Priorities.PushBack(req)
- }
+ me.DownloadStrategy.TorrentPrioritize(t, off, len_)
for _, cn := range t.Conns {
me.replenishConnRequests(t, cn)
}
err = io.EOF
return
}
- t.lastReadPiece = int(index)
piece := t.Pieces[index]
pieceOff := pp.Integer(off % int64(t.PieceLength(0)))
high := int(t.PieceLength(index) - pieceOff)
}
// Unprioritize the chunk.
- var next *list.Element
- for e := t.Priorities.Front(); e != nil; e = next {
- next = e.Next()
- if e.Value.(request) == req {
- t.Priorities.Remove(e)
- }
- }
+ me.DownloadStrategy.TorrentGotChunk(t, req)
// Cancel pending requests for this chunk.
cancelled := false
p.EverHashed = true
if correct {
p.PendingChunkSpecs = nil
- // log.Printf("%s: got piece %d, (%d/%d)", t, piece, t.NumPiecesCompleted(), t.NumPieces())
- var next *list.Element
- for e := t.Priorities.Front(); e != nil; e = next {
- next = e.Next()
- if e.Value.(request).Index == piece {
- t.Priorities.Remove(e)
- }
- }
+ me.DownloadStrategy.TorrentGotPiece(t, int(piece))
me.dataReady(dataSpec{
t.InfoHash,
request{
import (
pp "bitbucket.org/anacrolix/go.torrent/peer_protocol"
+ "container/list"
)
type DownloadStrategy interface {
TorrentStarted(t *torrent)
TorrentStopped(t *torrent)
DeleteRequest(t *torrent, r request)
+ TorrentPrioritize(t *torrent, off, _len int64)
+ TorrentGotChunk(t *torrent, r request)
+ TorrentGotPiece(t *torrent, piece int)
}
type DefaultDownloadStrategy struct {
return
}
// First request prioritized chunks.
- for e := t.Priorities.Front(); e != nil; e = e.Next() {
- if !addRequest(e.Value.(request)) {
- return
- }
- }
+ // for e := t.Priorities.Front(); e != nil; e = e.Next() {
+ // if !addRequest(e.Value.(request)) {
+ // return
+ // }
+ // }
// Then finish off incomplete pieces in order of bytes remaining.
for _, heatThreshold := range []int{1, 4, 15, 60} {
for e := t.PiecesByBytesLeft.Front(); e != nil; e = e.Next() {
m[r]--
}
-type ResponsiveDownloadStrategy struct {
+func (me *DefaultDownloadStrategy) TorrentGotChunk(t *torrent, c request) {}
+func (me *DefaultDownloadStrategy) TorrentGotPiece(t *torrent, piece int) {}
+func (*DefaultDownloadStrategy) TorrentPrioritize(t *torrent, off, _len int64) {}
+
+func NewResponsiveDownloadStrategy(readahead int) *responsiveDownloadStrategy {
+ return &responsiveDownloadStrategy{
+ Readahead: readahead,
+ lastReadPiece: make(map[*torrent]int),
+ priorities: make(map[*torrent]*list.List),
+ }
+}
+
+type responsiveDownloadStrategy struct {
// How many bytes to preemptively download starting at the beginning of
// the last piece read for a given torrent.
- Readahead int
+ Readahead int
+ lastReadPiece map[*torrent]int
+ priorities map[*torrent]*list.List
}
-func (ResponsiveDownloadStrategy) TorrentStarted(*torrent) {}
-func (ResponsiveDownloadStrategy) TorrentStopped(*torrent) {}
-func (ResponsiveDownloadStrategy) DeleteRequest(*torrent, request) {}
+func (me *responsiveDownloadStrategy) TorrentStarted(t *torrent) {
+ me.priorities[t] = list.New()
+}
+
+func (me *responsiveDownloadStrategy) TorrentStopped(t *torrent) {
+ delete(me.lastReadPiece, t)
+ delete(me.priorities, t)
+}
+func (responsiveDownloadStrategy) DeleteRequest(*torrent, request) {}
-func (me *ResponsiveDownloadStrategy) FillRequests(t *torrent, c *connection) {
- for e := t.Priorities.Front(); e != nil; e = e.Next() {
+func (me *responsiveDownloadStrategy) FillRequests(t *torrent, c *connection) {
+ if len(c.Requests) >= (c.PeerMaxRequests+1)/2 || len(c.Requests) >= 64 {
+ return
+ }
+
+ // Short circuit request fills at a level that might reduce receiving of
+ // unnecessary chunks.
+ requestWrapper := func(r request) bool {
+ if len(c.Requests) >= 64 {
+ return false
+ }
+ return c.Request(r)
+ }
+
+ prios := me.priorities[t]
+ for e := prios.Front(); e != nil; e = e.Next() {
req := e.Value.(request)
if _, ok := t.Pieces[req.Index].PendingChunkSpecs[req.chunkSpec]; !ok {
panic(req)
}
- if !c.Request(e.Value.(request)) {
+ if !requestWrapper(e.Value.(request)) {
return
}
}
- readaheadPieces := (me.Readahead + t.UsualPieceSize() - 1) / t.UsualPieceSize()
- for i := t.lastReadPiece; i < t.lastReadPiece+readaheadPieces && i < t.NumPieces(); i++ {
- for _, cs := range t.Pieces[i].shuffledPendingChunkSpecs() {
- if !c.Request(request{pp.Integer(i), cs}) {
- return
+
+ if lastReadPiece, ok := me.lastReadPiece[t]; ok {
+ readaheadPieces := (me.Readahead + t.UsualPieceSize() - 1) / t.UsualPieceSize()
+ for i := lastReadPiece; i < lastReadPiece+readaheadPieces && i < t.NumPieces(); i++ {
+ for _, cs := range t.Pieces[i].shuffledPendingChunkSpecs() {
+ if !requestWrapper(request{pp.Integer(i), cs}) {
+ return
+ }
}
}
}
// Request chunks in random order to reduce overlap with other
// connections.
for _, cs := range t.Pieces[index].shuffledPendingChunkSpecs() {
- if !c.Request(request{pp.Integer(index), cs}) {
+ if !requestWrapper(request{pp.Integer(index), cs}) {
return
}
}
}
}
+
+func (me *responsiveDownloadStrategy) TorrentGotChunk(t *torrent, req request) {
+ prios := me.priorities[t]
+ var next *list.Element
+ for e := prios.Front(); e != nil; e = next {
+ next = e.Next()
+ if e.Value.(request) == req {
+ prios.Remove(e)
+ }
+ }
+}
+
+func (me *responsiveDownloadStrategy) TorrentGotPiece(t *torrent, piece int) {
+ var next *list.Element
+ prios := me.priorities[t]
+ for e := prios.Front(); e != nil; e = next {
+ next = e.Next()
+ if int(e.Value.(request).Index) == piece {
+ prios.Remove(e)
+ }
+ }
+}
+
+func (s *responsiveDownloadStrategy) TorrentPrioritize(t *torrent, off, _len int64) {
+ newPriorities := make([]request, 0, (_len+chunkSize-1)/chunkSize)
+ for _len > 0 {
+ req, ok := t.offsetRequest(off)
+ if !ok {
+ panic("bad offset")
+ }
+ reqOff := t.requestOffset(req)
+ // Gain the alignment adjustment.
+ _len += off - reqOff
+ // Lose the length of this block.
+ _len -= int64(req.Length)
+ off = reqOff + int64(req.Length)
+ if !t.wantPiece(int(req.Index)) {
+ continue
+ }
+ newPriorities = append(newPriorities, req)
+ }
+ if len(newPriorities) == 0 {
+ return
+ }
+ s.lastReadPiece[t] = int(newPriorities[0].Index)
+ if t.wantChunk(newPriorities[0]) {
+ s.priorities[t].PushFront(newPriorities[0])
+ }
+ for _, req := range newPriorities[1:] {
+ if t.wantChunk(req) {
+ s.priorities[t].PushBack(req)
+ }
+ }
+}
PiecesByBytesLeft *OrderedList
Data mmap_span.MMapSpan
// Prevent mutations to Data memory maps while in use as they're not safe.
- dataLock sync.RWMutex
- Info *metainfo.Info
- Conns []*connection
- Peers []Peer
- Priorities *list.List
+ dataLock sync.RWMutex
+ Info *metainfo.Info
+ Conns []*connection
+ Peers []Peer
// BEP 12 Multitracker Metadata Extension. The tracker.Client instances
// mirror their respective URLs from the announce-list key.
- Trackers [][]tracker.Client
- lastReadPiece int
- DisplayName string
- MetaData []byte
- metadataHave []bool
+ Trackers [][]tracker.Client
+ DisplayName string
+ MetaData []byte
+ metadataHave []bool
}
func (t *torrent) InvalidateMetadata() {
piece.bytesLeftElement = t.PiecesByBytesLeft.Insert(index)
t.pendAllChunkSpecs(pp.Integer(index))
}
- t.Priorities = list.New()
for _, conn := range t.Conns {
if err := conn.setNumPieces(t.NumPieces()); err != nil {
log.Printf("closing connection: %s", err)
fmt.Fprintf(w, "%c", t.pieceStatusChar(index))
}
fmt.Fprintln(w)
- fmt.Fprintln(w, "Priorities: ")
- if t.Priorities != nil {
- for e := t.Priorities.Front(); e != nil; e = e.Next() {
- fmt.Fprintf(w, "\t%v\n", e.Value)
- }
- }
+ // fmt.Fprintln(w, "Priorities: ")
+ // if t.Priorities != nil {
+ // for e := t.Priorities.Front(); e != nil; e = e.Next() {
+ // fmt.Fprintf(w, "\t%v\n", e.Value)
+ // }
+ // }
fmt.Fprintf(w, "Pending peers: %d\n", len(t.Peers))
for _, c := range t.Conns {
c.WriteStatus(w)
return false
}
+func (t *torrent) wantChunk(r request) bool {
+ if !t.wantPiece(int(r.Index)) {
+ return false
+ }
+ _, ok := t.Pieces[r.Index].PendingChunkSpecs[r.chunkSpec]
+ return ok
+}
+
func (t *torrent) wantPiece(index int) bool {
if !t.haveInfo() {
return false