]> Sergey Matveev's repositories - btrtrc.git/blob - piece.go
Send hash requests for missing v2 hashes
[btrtrc.git] / piece.go
1 package torrent
2
3 import (
4         "fmt"
5         "sync"
6
7         "github.com/anacrolix/chansync"
8         g "github.com/anacrolix/generics"
9         "github.com/anacrolix/missinggo/v2/bitmap"
10
11         "github.com/anacrolix/torrent/metainfo"
12         pp "github.com/anacrolix/torrent/peer_protocol"
13         "github.com/anacrolix/torrent/storage"
14         infohash_v2 "github.com/anacrolix/torrent/types/infohash-v2"
15 )
16
17 type Piece struct {
18         // The completed piece SHA1 hash, from the metainfo "pieces" field. Nil if the info is not V1
19         // compatible.
20         hash   *metainfo.Hash
21         hashV2 g.Option[infohash_v2.T]
22         t      *Torrent
23         index  pieceIndex
24         files  []*File
25
26         readerCond chansync.BroadcastCond
27
28         numVerifies         int64
29         hashing             bool
30         marking             bool
31         storageCompletionOk bool
32
33         publicPieceState PieceState
34         priority         piecePriority
35         // Availability adjustment for this piece relative to len(Torrent.connsWithAllPieces). This is
36         // incremented for any piece a peer has when a peer has a piece, Torrent.haveInfo is true, and
37         // the Peer isn't recorded in Torrent.connsWithAllPieces.
38         relativeAvailability int
39
40         // This can be locked when the Client lock is taken, but probably not vice versa.
41         pendingWritesMutex sync.Mutex
42         pendingWrites      int
43         noPendingWrites    sync.Cond
44
45         // Connections that have written data to this piece since its last check.
46         // This can include connections that have closed.
47         dirtiers map[*Peer]struct{}
48 }
49
50 func (p *Piece) String() string {
51         return fmt.Sprintf("%s/%d", p.t.canonicalShortInfohash().HexString(), p.index)
52 }
53
54 func (p *Piece) Info() metainfo.Piece {
55         return p.t.info.Piece(int(p.index))
56 }
57
58 func (p *Piece) Storage() storage.Piece {
59         return p.t.storage.Piece(p.Info())
60 }
61
62 func (p *Piece) Flush() {
63         if p.t.storage.Flush != nil {
64                 _ = p.t.storage.Flush()
65         }
66 }
67
68 func (p *Piece) pendingChunkIndex(chunkIndex chunkIndexType) bool {
69         return !p.chunkIndexDirty(chunkIndex)
70 }
71
72 func (p *Piece) pendingChunk(cs ChunkSpec, chunkSize pp.Integer) bool {
73         return p.pendingChunkIndex(chunkIndexFromChunkSpec(cs, chunkSize))
74 }
75
76 func (p *Piece) hasDirtyChunks() bool {
77         return p.numDirtyChunks() != 0
78 }
79
80 func (p *Piece) numDirtyChunks() chunkIndexType {
81         return chunkIndexType(roaringBitmapRangeCardinality[RequestIndex](
82                 &p.t.dirtyChunks,
83                 p.requestIndexOffset(),
84                 p.t.pieceRequestIndexOffset(p.index+1)))
85 }
86
87 func (p *Piece) unpendChunkIndex(i chunkIndexType) {
88         p.t.dirtyChunks.Add(p.requestIndexOffset() + i)
89         p.t.updatePieceRequestOrderPiece(p.index)
90         p.readerCond.Broadcast()
91 }
92
93 func (p *Piece) pendChunkIndex(i RequestIndex) {
94         p.t.dirtyChunks.Remove(p.requestIndexOffset() + i)
95         p.t.updatePieceRequestOrderPiece(p.index)
96 }
97
98 func (p *Piece) numChunks() chunkIndexType {
99         return p.t.pieceNumChunks(p.index)
100 }
101
102 func (p *Piece) incrementPendingWrites() {
103         p.pendingWritesMutex.Lock()
104         p.pendingWrites++
105         p.pendingWritesMutex.Unlock()
106 }
107
108 func (p *Piece) decrementPendingWrites() {
109         p.pendingWritesMutex.Lock()
110         if p.pendingWrites == 0 {
111                 panic("assertion")
112         }
113         p.pendingWrites--
114         if p.pendingWrites == 0 {
115                 p.noPendingWrites.Broadcast()
116         }
117         p.pendingWritesMutex.Unlock()
118 }
119
120 func (p *Piece) waitNoPendingWrites() {
121         p.pendingWritesMutex.Lock()
122         for p.pendingWrites != 0 {
123                 p.noPendingWrites.Wait()
124         }
125         p.pendingWritesMutex.Unlock()
126 }
127
128 func (p *Piece) chunkIndexDirty(chunk chunkIndexType) bool {
129         return p.t.dirtyChunks.Contains(p.requestIndexOffset() + chunk)
130 }
131
132 func (p *Piece) chunkIndexSpec(chunk chunkIndexType) ChunkSpec {
133         return chunkIndexSpec(pp.Integer(chunk), p.length(), p.chunkSize())
134 }
135
136 func (p *Piece) numDirtyBytes() (ret pp.Integer) {
137         // defer func() {
138         //      if ret > p.length() {
139         //              panic("too many dirty bytes")
140         //      }
141         // }()
142         numRegularDirtyChunks := p.numDirtyChunks()
143         if p.chunkIndexDirty(p.numChunks() - 1) {
144                 numRegularDirtyChunks--
145                 ret += p.chunkIndexSpec(p.lastChunkIndex()).Length
146         }
147         ret += pp.Integer(numRegularDirtyChunks) * p.chunkSize()
148         return
149 }
150
151 func (p *Piece) length() pp.Integer {
152         return p.t.pieceLength(p.index)
153 }
154
155 func (p *Piece) chunkSize() pp.Integer {
156         return p.t.chunkSize
157 }
158
159 func (p *Piece) lastChunkIndex() chunkIndexType {
160         return p.numChunks() - 1
161 }
162
163 func (p *Piece) bytesLeft() (ret pp.Integer) {
164         if p.t.pieceComplete(p.index) {
165                 return 0
166         }
167         return p.length() - p.numDirtyBytes()
168 }
169
170 // Forces the piece data to be rehashed.
171 func (p *Piece) VerifyData() {
172         p.t.cl.lock()
173         defer p.t.cl.unlock()
174         target := p.numVerifies + 1
175         if p.hashing {
176                 target++
177         }
178         // log.Printf("target: %d", target)
179         p.t.queuePieceCheck(p.index)
180         for {
181                 // log.Printf("got %d verifies", p.numVerifies)
182                 if p.numVerifies >= target {
183                         break
184                 }
185                 p.t.cl.event.Wait()
186         }
187         // log.Print("done")
188 }
189
190 func (p *Piece) queuedForHash() bool {
191         return p.t.piecesQueuedForHash.Get(bitmap.BitIndex(p.index))
192 }
193
194 func (p *Piece) torrentBeginOffset() int64 {
195         return int64(p.index) * p.t.info.PieceLength
196 }
197
198 func (p *Piece) torrentEndOffset() int64 {
199         return p.torrentBeginOffset() + int64(p.t.usualPieceSize())
200 }
201
202 func (p *Piece) SetPriority(prio piecePriority) {
203         p.t.cl.lock()
204         defer p.t.cl.unlock()
205         p.priority = prio
206         p.t.updatePiecePriority(p.index, "Piece.SetPriority")
207 }
208
209 func (p *Piece) purePriority() (ret piecePriority) {
210         for _, f := range p.files {
211                 ret.Raise(f.prio)
212         }
213         if p.t.readerNowPieces().Contains(bitmap.BitIndex(p.index)) {
214                 ret.Raise(PiecePriorityNow)
215         }
216         // if t._readerNowPieces.Contains(piece - 1) {
217         //      return PiecePriorityNext
218         // }
219         if p.t.readerReadaheadPieces().Contains(bitmap.BitIndex(p.index)) {
220                 ret.Raise(PiecePriorityReadahead)
221         }
222         ret.Raise(p.priority)
223         return
224 }
225
226 func (p *Piece) uncachedPriority() (ret piecePriority) {
227         if p.hashing || p.marking || p.t.pieceComplete(p.index) || p.queuedForHash() {
228                 return PiecePriorityNone
229         }
230         return p.purePriority()
231 }
232
233 // Tells the Client to refetch the completion status from storage, updating priority etc. if
234 // necessary. Might be useful if you know the state of the piece data has changed externally.
235 func (p *Piece) UpdateCompletion() {
236         p.t.cl.lock()
237         defer p.t.cl.unlock()
238         p.t.updatePieceCompletion(p.index)
239 }
240
241 func (p *Piece) completion() (ret storage.Completion) {
242         ret.Complete = p.t.pieceComplete(p.index)
243         ret.Ok = p.storageCompletionOk
244         return
245 }
246
247 func (p *Piece) allChunksDirty() bool {
248         return p.numDirtyChunks() == p.numChunks()
249 }
250
251 func (p *Piece) State() PieceState {
252         return p.t.PieceState(p.index)
253 }
254
255 func (p *Piece) requestIndexOffset() RequestIndex {
256         return p.t.pieceRequestIndexOffset(p.index)
257 }
258
259 func (p *Piece) availability() int {
260         return len(p.t.connsWithAllPieces) + p.relativeAvailability
261 }
262
263 // For v2 torrents, files are aligned to pieces so there should always only be a single file for a
264 // given piece.
265 func (p *Piece) mustGetOnlyFile() *File {
266         if len(p.files) != 1 {
267                 panic(len(p.files))
268         }
269         return p.files[0]
270 }