6 pp "bitbucket.org/anacrolix/go.torrent/peer_protocol"
9 type DownloadStrategy interface {
10 FillRequests(t *torrent, c *connection)
11 TorrentStarted(t *torrent)
12 TorrentStopped(t *torrent)
13 DeleteRequest(t *torrent, r request)
14 TorrentPrioritize(t *torrent, off, _len int64)
15 TorrentGotChunk(t *torrent, r request)
16 TorrentGotPiece(t *torrent, piece int)
19 type DefaultDownloadStrategy struct {
20 heat map[*torrent]map[request]int
23 func (s *DefaultDownloadStrategy) FillRequests(t *torrent, c *connection) {
28 if len(c.Requests) >= (c.PeerMaxRequests+1)/2 {
33 addRequest := func(req request) (again bool) {
34 piece := t.Pieces[req.Index]
35 if piece.Hashing || piece.QueuedForHash {
36 // We can't be sure we want this.
40 // We already have this.
43 if c.RequestPending(req) {
46 again = c.Request(req)
47 if c.RequestPending(req) {
52 // First request prioritized chunks.
53 // for e := t.Priorities.Front(); e != nil; e = e.Next() {
54 // if !addRequest(e.Value.(request)) {
58 // Then finish off incomplete pieces in order of bytes remaining.
59 for _, heatThreshold := range []int{1, 4, 15, 60} {
60 for e := t.PiecesByBytesLeft.Front(); e != nil; e = e.Next() {
61 pieceIndex := pp.Integer(e.Value.(int))
62 for _, chunkSpec := range t.Pieces[pieceIndex].shuffledPendingChunkSpecs() {
63 // for chunkSpec := range t.Pieces[pieceIndex].PendingChunkSpecs {
64 r := request{pieceIndex, chunkSpec}
65 if th[r] >= heatThreshold {
76 func (s *DefaultDownloadStrategy) TorrentStarted(t *torrent) {
78 panic("torrent already started")
81 s.heat = make(map[*torrent]map[request]int, 10)
83 s.heat[t] = make(map[request]int, t.ChunkCount())
86 func (s *DefaultDownloadStrategy) TorrentStopped(t *torrent) {
87 if _, ok := s.heat[t]; !ok {
88 panic("torrent not yet started")
93 func (s *DefaultDownloadStrategy) DeleteRequest(t *torrent, r request) {
96 panic("no pending requests")
101 func (me *DefaultDownloadStrategy) TorrentGotChunk(t *torrent, c request) {}
102 func (me *DefaultDownloadStrategy) TorrentGotPiece(t *torrent, piece int) {}
103 func (*DefaultDownloadStrategy) TorrentPrioritize(t *torrent, off, _len int64) {}
105 func NewResponsiveDownloadStrategy(readahead int) *responsiveDownloadStrategy {
106 return &responsiveDownloadStrategy{
107 Readahead: readahead,
108 lastReadPiece: make(map[*torrent]int),
109 priorities: make(map[*torrent]*list.List),
113 type responsiveDownloadStrategy struct {
114 // How many bytes to preemptively download starting at the beginning of
115 // the last piece read for a given torrent.
117 lastReadPiece map[*torrent]int
118 priorities map[*torrent]*list.List
121 func (me *responsiveDownloadStrategy) TorrentStarted(t *torrent) {
122 me.priorities[t] = list.New()
125 func (me *responsiveDownloadStrategy) TorrentStopped(t *torrent) {
126 delete(me.lastReadPiece, t)
127 delete(me.priorities, t)
129 func (responsiveDownloadStrategy) DeleteRequest(*torrent, request) {}
131 func (me *responsiveDownloadStrategy) FillRequests(t *torrent, c *connection) {
132 if len(c.Requests) >= (c.PeerMaxRequests+1)/2 || len(c.Requests) >= 64 {
136 // Short circuit request fills at a level that might reduce receiving of
137 // unnecessary chunks.
138 requestWrapper := func(r request) bool {
139 if len(c.Requests) >= 64 {
145 prios := me.priorities[t]
146 for e := prios.Front(); e != nil; e = e.Next() {
147 req := e.Value.(request)
148 if _, ok := t.Pieces[req.Index].PendingChunkSpecs[req.chunkSpec]; !ok {
151 if !requestWrapper(e.Value.(request)) {
156 if lastReadPiece, ok := me.lastReadPiece[t]; ok {
157 readaheadPieces := (me.Readahead + t.UsualPieceSize() - 1) / t.UsualPieceSize()
158 for i := lastReadPiece; i < lastReadPiece+readaheadPieces && i < t.NumPieces(); i++ {
159 for _, cs := range t.Pieces[i].shuffledPendingChunkSpecs() {
160 if !requestWrapper(request{pp.Integer(i), cs}) {
166 // Then finish off incomplete pieces in order of bytes remaining.
167 for e := t.PiecesByBytesLeft.Front(); e != nil; e = e.Next() {
168 index := e.Value.(int)
169 // Stop when we're onto untouched pieces.
170 if !t.PiecePartiallyDownloaded(index) {
173 // Request chunks in random order to reduce overlap with other
175 for _, cs := range t.Pieces[index].shuffledPendingChunkSpecs() {
176 if !requestWrapper(request{pp.Integer(index), cs}) {
183 func (me *responsiveDownloadStrategy) TorrentGotChunk(t *torrent, req request) {
184 prios := me.priorities[t]
185 var next *list.Element
186 for e := prios.Front(); e != nil; e = next {
188 if e.Value.(request) == req {
194 func (me *responsiveDownloadStrategy) TorrentGotPiece(t *torrent, piece int) {
195 var next *list.Element
196 prios := me.priorities[t]
197 for e := prios.Front(); e != nil; e = next {
199 if int(e.Value.(request).Index) == piece {
205 func (s *responsiveDownloadStrategy) TorrentPrioritize(t *torrent, off, _len int64) {
206 newPriorities := make([]request, 0, (_len+chunkSize-1)/chunkSize)
208 req, ok := t.offsetRequest(off)
212 reqOff := t.requestOffset(req)
213 // Gain the alignment adjustment.
215 // Lose the length of this block.
216 _len -= int64(req.Length)
217 off = reqOff + int64(req.Length)
218 if !t.wantPiece(int(req.Index)) {
221 newPriorities = append(newPriorities, req)
223 if len(newPriorities) == 0 {
226 s.lastReadPiece[t] = int(newPriorities[0].Index)
227 if t.wantChunk(newPriorities[0]) {
228 s.priorities[t].PushFront(newPriorities[0])
230 for _, req := range newPriorities[1:] {
231 if t.wantChunk(req) {
232 s.priorities[t].PushBack(req)