]> Sergey Matveev's repositories - btrtrc.git/blob - piece.go
Extract the request strategy logic
[btrtrc.git] / piece.go
1 package torrent
2
3 import (
4         "fmt"
5         "sync"
6
7         "github.com/anacrolix/missinggo/v2/bitmap"
8
9         "github.com/anacrolix/torrent/metainfo"
10         pp "github.com/anacrolix/torrent/peer_protocol"
11         "github.com/anacrolix/torrent/storage"
12 )
13
14 // Describes the importance of obtaining a particular piece.
15 type piecePriority byte
16
17 func (pp *piecePriority) Raise(maybe piecePriority) bool {
18         if maybe > *pp {
19                 *pp = maybe
20                 return true
21         }
22         return false
23 }
24
25 // Priority for use in PriorityBitmap
26 func (me piecePriority) BitmapPriority() int {
27         return -int(me)
28 }
29
30 const (
31         PiecePriorityNone      piecePriority = iota // Not wanted. Must be the zero value.
32         PiecePriorityNormal                         // Wanted.
33         PiecePriorityHigh                           // Wanted a lot.
34         PiecePriorityReadahead                      // May be required soon.
35         // Succeeds a piece where a read occurred. Currently the same as Now,
36         // apparently due to issues with caching.
37         PiecePriorityNext
38         PiecePriorityNow // A Reader is reading in this piece. Highest urgency.
39 )
40
41 type Piece struct {
42         // The completed piece SHA1 hash, from the metainfo "pieces" field.
43         hash  *metainfo.Hash
44         t     *Torrent
45         index pieceIndex
46         files []*File
47         // Chunks we've written to since the last check. The chunk offset and
48         // length can be determined by the request chunkSize in use.
49         _dirtyChunks bitmap.Bitmap
50
51         hashing             bool
52         numVerifies         int64
53         storageCompletionOk bool
54
55         publicPieceState PieceState
56         priority         piecePriority
57
58         pendingWritesMutex sync.Mutex
59         pendingWrites      int
60         noPendingWrites    sync.Cond
61
62         // Connections that have written data to this piece since its last check.
63         // This can include connections that have closed.
64         dirtiers map[*connection]struct{}
65 }
66
67 func (p *Piece) String() string {
68         return fmt.Sprintf("%s/%d", p.t.infoHash.HexString(), p.index)
69 }
70
71 func (p *Piece) Info() metainfo.Piece {
72         return p.t.info.Piece(int(p.index))
73 }
74
75 func (p *Piece) Storage() storage.Piece {
76         return p.t.storage.Piece(p.Info())
77 }
78
79 func (p *Piece) pendingChunkIndex(chunkIndex int) bool {
80         return !p._dirtyChunks.Contains(chunkIndex)
81 }
82
83 func (p *Piece) pendingChunk(cs chunkSpec, chunkSize pp.Integer) bool {
84         return p.pendingChunkIndex(chunkIndex(cs, chunkSize))
85 }
86
87 func (p *Piece) hasDirtyChunks() bool {
88         return p._dirtyChunks.Len() != 0
89 }
90
91 func (p *Piece) numDirtyChunks() pp.Integer {
92         return pp.Integer(p._dirtyChunks.Len())
93 }
94
95 func (p *Piece) unpendChunkIndex(i int) {
96         p._dirtyChunks.Add(i)
97         p.t.tickleReaders()
98 }
99
100 func (p *Piece) pendChunkIndex(i int) {
101         p._dirtyChunks.Remove(i)
102 }
103
104 func (p *Piece) numChunks() pp.Integer {
105         return p.t.pieceNumChunks(p.index)
106 }
107
108 func (p *Piece) undirtiedChunkIndices() (ret bitmap.Bitmap) {
109         ret = p._dirtyChunks.Copy()
110         ret.FlipRange(0, bitmap.BitIndex(p.numChunks()))
111         return
112 }
113
114 func (p *Piece) incrementPendingWrites() {
115         p.pendingWritesMutex.Lock()
116         p.pendingWrites++
117         p.pendingWritesMutex.Unlock()
118 }
119
120 func (p *Piece) decrementPendingWrites() {
121         p.pendingWritesMutex.Lock()
122         if p.pendingWrites == 0 {
123                 panic("assertion")
124         }
125         p.pendingWrites--
126         if p.pendingWrites == 0 {
127                 p.noPendingWrites.Broadcast()
128         }
129         p.pendingWritesMutex.Unlock()
130 }
131
132 func (p *Piece) waitNoPendingWrites() {
133         p.pendingWritesMutex.Lock()
134         for p.pendingWrites != 0 {
135                 p.noPendingWrites.Wait()
136         }
137         p.pendingWritesMutex.Unlock()
138 }
139
140 func (p *Piece) chunkIndexDirty(chunk pp.Integer) bool {
141         return p._dirtyChunks.Contains(bitmap.BitIndex(chunk))
142 }
143
144 func (p *Piece) chunkIndexSpec(chunk pp.Integer) chunkSpec {
145         return chunkIndexSpec(chunk, p.length(), p.chunkSize())
146 }
147
148 func (p *Piece) numDirtyBytes() (ret pp.Integer) {
149         // defer func() {
150         //      if ret > p.length() {
151         //              panic("too many dirty bytes")
152         //      }
153         // }()
154         numRegularDirtyChunks := p.numDirtyChunks()
155         if p.chunkIndexDirty(p.numChunks() - 1) {
156                 numRegularDirtyChunks--
157                 ret += p.chunkIndexSpec(p.lastChunkIndex()).Length
158         }
159         ret += pp.Integer(numRegularDirtyChunks) * p.chunkSize()
160         return
161 }
162
163 func (p *Piece) length() pp.Integer {
164         return p.t.pieceLength(p.index)
165 }
166
167 func (p *Piece) chunkSize() pp.Integer {
168         return p.t.chunkSize
169 }
170
171 func (p *Piece) lastChunkIndex() pp.Integer {
172         return p.numChunks() - 1
173 }
174
175 func (p *Piece) bytesLeft() (ret pp.Integer) {
176         if p.t.pieceComplete(p.index) {
177                 return 0
178         }
179         return p.length() - p.numDirtyBytes()
180 }
181
182 // Forces the piece data to be rehashed.
183 func (p *Piece) VerifyData() {
184         p.t.cl.lock()
185         defer p.t.cl.unlock()
186         target := p.numVerifies + 1
187         if p.hashing {
188                 target++
189         }
190         //log.Printf("target: %d", target)
191         p.t.queuePieceCheck(p.index)
192         for {
193                 //log.Printf("got %d verifies", p.numVerifies)
194                 if p.numVerifies >= target {
195                         break
196                 }
197                 p.t.cl.event.Wait()
198         }
199         // log.Print("done")
200 }
201
202 func (p *Piece) queuedForHash() bool {
203         return p.t.piecesQueuedForHash.Get(bitmap.BitIndex(p.index))
204 }
205
206 func (p *Piece) torrentBeginOffset() int64 {
207         return int64(p.index) * p.t.info.PieceLength
208 }
209
210 func (p *Piece) torrentEndOffset() int64 {
211         return p.torrentBeginOffset() + int64(p.length())
212 }
213
214 func (p *Piece) SetPriority(prio piecePriority) {
215         p.t.cl.lock()
216         defer p.t.cl.unlock()
217         p.priority = prio
218         p.t.updatePiecePriority(p.index)
219 }
220
221 func (p *Piece) uncachedPriority() (ret piecePriority) {
222         if p.t.pieceComplete(p.index) || p.t.pieceQueuedForHash(p.index) || p.t.hashingPiece(p.index) {
223                 return PiecePriorityNone
224         }
225         for _, f := range p.files {
226                 ret.Raise(f.prio)
227         }
228         if p.t.readerNowPieces().Contains(int(p.index)) {
229                 ret.Raise(PiecePriorityNow)
230         }
231         // if t._readerNowPieces.Contains(piece - 1) {
232         //      return PiecePriorityNext
233         // }
234         if p.t.readerReadaheadPieces().Contains(bitmap.BitIndex(p.index)) {
235                 ret.Raise(PiecePriorityReadahead)
236         }
237         ret.Raise(p.priority)
238         return
239 }
240
241 func (p *Piece) completion() (ret storage.Completion) {
242         ret.Complete = p.t.pieceComplete(p.index)
243         ret.Ok = p.storageCompletionOk
244         return
245 }
246
247 func (p *Piece) allChunksDirty() bool {
248         return p._dirtyChunks.Len() == int(p.numChunks())
249 }
250
251 func (p *Piece) requestStrategyPiece() requestStrategyPiece {
252         return p
253 }
254
255 func (p *Piece) dirtyChunks() bitmap.Bitmap {
256         return p._dirtyChunks
257 }
258
259 func (p *Piece) wouldDuplicateRecent(cs chunkSpec) bool {
260         // This piece has been requested on another connection, and the duplicate request timer is still
261         // running.
262         _, ok := p.t.lastRequested[request{pp.Integer(p.index), cs}]
263         return ok
264 }