]> Sergey Matveev's repositories - btrtrc.git/blob - download_strategies.go
Clean up imports
[btrtrc.git] / download_strategies.go
1 package torrent
2
3 import (
4         "container/list"
5
6         pp "bitbucket.org/anacrolix/go.torrent/peer_protocol"
7 )
8
9 type DownloadStrategy interface {
10         FillRequests(t *torrent, c *connection)
11         TorrentStarted(t *torrent)
12         TorrentStopped(t *torrent)
13         DeleteRequest(t *torrent, r request)
14         TorrentPrioritize(t *torrent, off, _len int64)
15         TorrentGotChunk(t *torrent, r request)
16         TorrentGotPiece(t *torrent, piece int)
17 }
18
19 type DefaultDownloadStrategy struct {
20         heat map[*torrent]map[request]int
21 }
22
23 func (s *DefaultDownloadStrategy) FillRequests(t *torrent, c *connection) {
24         if c.Interested {
25                 if c.PeerChoked {
26                         return
27                 }
28                 if len(c.Requests) >= (c.PeerMaxRequests+1)/2 {
29                         return
30                 }
31         }
32         th := s.heat[t]
33         addRequest := func(req request) (again bool) {
34                 piece := t.Pieces[req.Index]
35                 if piece.Hashing || piece.QueuedForHash {
36                         // We can't be sure we want this.
37                         return true
38                 }
39                 if piece.Complete() {
40                         // We already have this.
41                         return true
42                 }
43                 if c.RequestPending(req) {
44                         return true
45                 }
46                 again = c.Request(req)
47                 if c.RequestPending(req) {
48                         th[req]++
49                 }
50                 return
51         }
52         // First request prioritized chunks.
53         // for e := t.Priorities.Front(); e != nil; e = e.Next() {
54         //      if !addRequest(e.Value.(request)) {
55         //              return
56         //      }
57         // }
58         // Then finish off incomplete pieces in order of bytes remaining.
59         for _, heatThreshold := range []int{1, 4, 15, 60} {
60                 for e := t.PiecesByBytesLeft.Front(); e != nil; e = e.Next() {
61                         pieceIndex := pp.Integer(e.Value.(int))
62                         for _, chunkSpec := range t.Pieces[pieceIndex].shuffledPendingChunkSpecs() {
63                                 // for chunkSpec := range t.Pieces[pieceIndex].PendingChunkSpecs {
64                                 r := request{pieceIndex, chunkSpec}
65                                 if th[r] >= heatThreshold {
66                                         continue
67                                 }
68                                 if !addRequest(r) {
69                                         return
70                                 }
71                         }
72                 }
73         }
74 }
75
76 func (s *DefaultDownloadStrategy) TorrentStarted(t *torrent) {
77         if s.heat[t] != nil {
78                 panic("torrent already started")
79         }
80         if s.heat == nil {
81                 s.heat = make(map[*torrent]map[request]int, 10)
82         }
83         s.heat[t] = make(map[request]int, t.ChunkCount())
84 }
85
86 func (s *DefaultDownloadStrategy) TorrentStopped(t *torrent) {
87         if _, ok := s.heat[t]; !ok {
88                 panic("torrent not yet started")
89         }
90         delete(s.heat, t)
91 }
92
93 func (s *DefaultDownloadStrategy) DeleteRequest(t *torrent, r request) {
94         m := s.heat[t]
95         if m[r] <= 0 {
96                 panic("no pending requests")
97         }
98         m[r]--
99 }
100
101 func (me *DefaultDownloadStrategy) TorrentGotChunk(t *torrent, c request)      {}
102 func (me *DefaultDownloadStrategy) TorrentGotPiece(t *torrent, piece int)      {}
103 func (*DefaultDownloadStrategy) TorrentPrioritize(t *torrent, off, _len int64) {}
104
105 func NewResponsiveDownloadStrategy(readahead int) *responsiveDownloadStrategy {
106         return &responsiveDownloadStrategy{
107                 Readahead:     readahead,
108                 lastReadPiece: make(map[*torrent]int),
109                 priorities:    make(map[*torrent]*list.List),
110         }
111 }
112
113 type responsiveDownloadStrategy struct {
114         // How many bytes to preemptively download starting at the beginning of
115         // the last piece read for a given torrent.
116         Readahead     int
117         lastReadPiece map[*torrent]int
118         priorities    map[*torrent]*list.List
119 }
120
121 func (me *responsiveDownloadStrategy) TorrentStarted(t *torrent) {
122         me.priorities[t] = list.New()
123 }
124
125 func (me *responsiveDownloadStrategy) TorrentStopped(t *torrent) {
126         delete(me.lastReadPiece, t)
127         delete(me.priorities, t)
128 }
129 func (responsiveDownloadStrategy) DeleteRequest(*torrent, request) {}
130
131 func (me *responsiveDownloadStrategy) FillRequests(t *torrent, c *connection) {
132         if len(c.Requests) >= (c.PeerMaxRequests+1)/2 || len(c.Requests) >= 64 {
133                 return
134         }
135
136         // Short circuit request fills at a level that might reduce receiving of
137         // unnecessary chunks.
138         requestWrapper := func(r request) bool {
139                 if len(c.Requests) >= 64 {
140                         return false
141                 }
142                 return c.Request(r)
143         }
144
145         prios := me.priorities[t]
146         for e := prios.Front(); e != nil; e = e.Next() {
147                 req := e.Value.(request)
148                 if _, ok := t.Pieces[req.Index].PendingChunkSpecs[req.chunkSpec]; !ok {
149                         panic(req)
150                 }
151                 if !requestWrapper(e.Value.(request)) {
152                         return
153                 }
154         }
155
156         if lastReadPiece, ok := me.lastReadPiece[t]; ok {
157                 readaheadPieces := (me.Readahead + t.UsualPieceSize() - 1) / t.UsualPieceSize()
158                 for i := lastReadPiece; i < lastReadPiece+readaheadPieces && i < t.NumPieces(); i++ {
159                         for _, cs := range t.Pieces[i].shuffledPendingChunkSpecs() {
160                                 if !requestWrapper(request{pp.Integer(i), cs}) {
161                                         return
162                                 }
163                         }
164                 }
165         }
166         // Then finish off incomplete pieces in order of bytes remaining.
167         for e := t.PiecesByBytesLeft.Front(); e != nil; e = e.Next() {
168                 index := e.Value.(int)
169                 // Stop when we're onto untouched pieces.
170                 if !t.PiecePartiallyDownloaded(index) {
171                         break
172                 }
173                 // Request chunks in random order to reduce overlap with other
174                 // connections.
175                 for _, cs := range t.Pieces[index].shuffledPendingChunkSpecs() {
176                         if !requestWrapper(request{pp.Integer(index), cs}) {
177                                 return
178                         }
179                 }
180         }
181 }
182
183 func (me *responsiveDownloadStrategy) TorrentGotChunk(t *torrent, req request) {
184         prios := me.priorities[t]
185         var next *list.Element
186         for e := prios.Front(); e != nil; e = next {
187                 next = e.Next()
188                 if e.Value.(request) == req {
189                         prios.Remove(e)
190                 }
191         }
192 }
193
194 func (me *responsiveDownloadStrategy) TorrentGotPiece(t *torrent, piece int) {
195         var next *list.Element
196         prios := me.priorities[t]
197         for e := prios.Front(); e != nil; e = next {
198                 next = e.Next()
199                 if int(e.Value.(request).Index) == piece {
200                         prios.Remove(e)
201                 }
202         }
203 }
204
205 func (s *responsiveDownloadStrategy) TorrentPrioritize(t *torrent, off, _len int64) {
206         newPriorities := make([]request, 0, (_len+chunkSize-1)/chunkSize)
207         for _len > 0 {
208                 req, ok := t.offsetRequest(off)
209                 if !ok {
210                         panic("bad offset")
211                 }
212                 reqOff := t.requestOffset(req)
213                 // Gain the alignment adjustment.
214                 _len += off - reqOff
215                 // Lose the length of this block.
216                 _len -= int64(req.Length)
217                 off = reqOff + int64(req.Length)
218                 if !t.wantPiece(int(req.Index)) {
219                         continue
220                 }
221                 newPriorities = append(newPriorities, req)
222         }
223         if len(newPriorities) == 0 {
224                 return
225         }
226         s.lastReadPiece[t] = int(newPriorities[0].Index)
227         if t.wantChunk(newPriorities[0]) {
228                 s.priorities[t].PushFront(newPriorities[0])
229         }
230         for _, req := range newPriorities[1:] {
231                 if t.wantChunk(req) {
232                         s.priorities[t].PushBack(req)
233                 }
234         }
235 }