]> Sergey Matveev's repositories - btrtrc.git/blob - piece.go
Track dirty chunks in a single bitmap on Torrent
[btrtrc.git] / piece.go
1 package torrent
2
3 import (
4         "fmt"
5         "sync"
6
7         "github.com/anacrolix/chansync"
8         "github.com/anacrolix/missinggo/v2/bitmap"
9
10         "github.com/anacrolix/torrent/metainfo"
11         pp "github.com/anacrolix/torrent/peer_protocol"
12         "github.com/anacrolix/torrent/storage"
13 )
14
15 type Piece struct {
16         // The completed piece SHA1 hash, from the metainfo "pieces" field.
17         hash  *metainfo.Hash
18         t     *Torrent
19         index pieceIndex
20         files []*File
21
22         readerCond chansync.BroadcastCond
23
24         numVerifies         int64
25         hashing             bool
26         marking             bool
27         storageCompletionOk bool
28
29         publicPieceState PieceState
30         priority         piecePriority
31         availability     int64
32
33         // This can be locked when the Client lock is taken, but probably not vice versa.
34         pendingWritesMutex sync.Mutex
35         pendingWrites      int
36         noPendingWrites    sync.Cond
37
38         // Connections that have written data to this piece since its last check.
39         // This can include connections that have closed.
40         dirtiers map[*Peer]struct{}
41 }
42
43 func (p *Piece) String() string {
44         return fmt.Sprintf("%s/%d", p.t.infoHash.HexString(), p.index)
45 }
46
47 func (p *Piece) Info() metainfo.Piece {
48         return p.t.info.Piece(int(p.index))
49 }
50
51 func (p *Piece) Storage() storage.Piece {
52         return p.t.storage.Piece(p.Info())
53 }
54
55 func (p *Piece) pendingChunkIndex(chunkIndex chunkIndexType) bool {
56         return !p.chunkIndexDirty(chunkIndex)
57 }
58
59 func (p *Piece) pendingChunk(cs ChunkSpec, chunkSize pp.Integer) bool {
60         return p.pendingChunkIndex(chunkIndexFromChunkSpec(cs, chunkSize))
61 }
62
63 func (p *Piece) hasDirtyChunks() bool {
64         return p.numDirtyChunks() != 0
65 }
66
67 func (p *Piece) numDirtyChunks() chunkIndexType {
68         return chunkIndexType(roaringBitmapRangeCardinality(
69                 &p.t.dirtyChunks,
70                 p.requestIndexOffset(),
71                 p.t.pieceRequestIndexOffset(p.index+1)))
72 }
73
74 func (p *Piece) unpendChunkIndex(i chunkIndexType) {
75         p.t.dirtyChunks.Add(p.requestIndexOffset() + i)
76         p.readerCond.Broadcast()
77 }
78
79 func (p *Piece) pendChunkIndex(i RequestIndex) {
80         p.t.dirtyChunks.Remove(p.requestIndexOffset() + i)
81 }
82
83 func (p *Piece) numChunks() chunkIndexType {
84         return p.t.pieceNumChunks(p.index)
85 }
86
87 func (p *Piece) incrementPendingWrites() {
88         p.pendingWritesMutex.Lock()
89         p.pendingWrites++
90         p.pendingWritesMutex.Unlock()
91 }
92
93 func (p *Piece) decrementPendingWrites() {
94         p.pendingWritesMutex.Lock()
95         if p.pendingWrites == 0 {
96                 panic("assertion")
97         }
98         p.pendingWrites--
99         if p.pendingWrites == 0 {
100                 p.noPendingWrites.Broadcast()
101         }
102         p.pendingWritesMutex.Unlock()
103 }
104
105 func (p *Piece) waitNoPendingWrites() {
106         p.pendingWritesMutex.Lock()
107         for p.pendingWrites != 0 {
108                 p.noPendingWrites.Wait()
109         }
110         p.pendingWritesMutex.Unlock()
111 }
112
113 func (p *Piece) chunkIndexDirty(chunk chunkIndexType) bool {
114         return p.t.dirtyChunks.Contains(p.requestIndexOffset() + chunk)
115 }
116
117 func (p *Piece) chunkIndexSpec(chunk chunkIndexType) ChunkSpec {
118         return chunkIndexSpec(pp.Integer(chunk), p.length(), p.chunkSize())
119 }
120
121 func (p *Piece) numDirtyBytes() (ret pp.Integer) {
122         // defer func() {
123         //      if ret > p.length() {
124         //              panic("too many dirty bytes")
125         //      }
126         // }()
127         numRegularDirtyChunks := p.numDirtyChunks()
128         if p.chunkIndexDirty(p.numChunks() - 1) {
129                 numRegularDirtyChunks--
130                 ret += p.chunkIndexSpec(p.lastChunkIndex()).Length
131         }
132         ret += pp.Integer(numRegularDirtyChunks) * p.chunkSize()
133         return
134 }
135
136 func (p *Piece) length() pp.Integer {
137         return p.t.pieceLength(p.index)
138 }
139
140 func (p *Piece) chunkSize() pp.Integer {
141         return p.t.chunkSize
142 }
143
144 func (p *Piece) lastChunkIndex() chunkIndexType {
145         return p.numChunks() - 1
146 }
147
148 func (p *Piece) bytesLeft() (ret pp.Integer) {
149         if p.t.pieceComplete(p.index) {
150                 return 0
151         }
152         return p.length() - p.numDirtyBytes()
153 }
154
155 // Forces the piece data to be rehashed.
156 func (p *Piece) VerifyData() {
157         p.t.cl.lock()
158         defer p.t.cl.unlock()
159         target := p.numVerifies + 1
160         if p.hashing {
161                 target++
162         }
163         //log.Printf("target: %d", target)
164         p.t.queuePieceCheck(p.index)
165         for {
166                 //log.Printf("got %d verifies", p.numVerifies)
167                 if p.numVerifies >= target {
168                         break
169                 }
170                 p.t.cl.event.Wait()
171         }
172         // log.Print("done")
173 }
174
175 func (p *Piece) queuedForHash() bool {
176         return p.t.piecesQueuedForHash.Get(bitmap.BitIndex(p.index))
177 }
178
179 func (p *Piece) torrentBeginOffset() int64 {
180         return int64(p.index) * p.t.info.PieceLength
181 }
182
183 func (p *Piece) torrentEndOffset() int64 {
184         return p.torrentBeginOffset() + int64(p.length())
185 }
186
187 func (p *Piece) SetPriority(prio piecePriority) {
188         p.t.cl.lock()
189         defer p.t.cl.unlock()
190         p.priority = prio
191         p.t.updatePiecePriority(p.index)
192 }
193
194 func (p *Piece) purePriority() (ret piecePriority) {
195         for _, f := range p.files {
196                 ret.Raise(f.prio)
197         }
198         if p.t.readerNowPieces().Contains(bitmap.BitIndex(p.index)) {
199                 ret.Raise(PiecePriorityNow)
200         }
201         // if t._readerNowPieces.Contains(piece - 1) {
202         //      return PiecePriorityNext
203         // }
204         if p.t.readerReadaheadPieces().Contains(bitmap.BitIndex(p.index)) {
205                 ret.Raise(PiecePriorityReadahead)
206         }
207         ret.Raise(p.priority)
208         return
209 }
210
211 func (p *Piece) uncachedPriority() (ret piecePriority) {
212         if p.t.pieceComplete(p.index) || p.t.pieceQueuedForHash(p.index) || p.t.hashingPiece(p.index) {
213                 return PiecePriorityNone
214         }
215         return p.purePriority()
216 }
217
218 // Tells the Client to refetch the completion status from storage, updating priority etc. if
219 // necessary. Might be useful if you know the state of the piece data has changed externally.
220 func (p *Piece) UpdateCompletion() {
221         p.t.cl.lock()
222         defer p.t.cl.unlock()
223         p.t.updatePieceCompletion(p.index)
224 }
225
226 func (p *Piece) completion() (ret storage.Completion) {
227         ret.Complete = p.t.pieceComplete(p.index)
228         ret.Ok = p.storageCompletionOk
229         return
230 }
231
232 func (p *Piece) allChunksDirty() bool {
233         return p.numDirtyChunks() == p.numChunks()
234 }
235
236 func (p *Piece) State() PieceState {
237         return p.t.PieceState(p.index)
238 }
239
240 func (p *Piece) iterUndirtiedChunks(f func(cs chunkIndexType)) {
241         for i := chunkIndexType(0); i < p.numChunks(); i++ {
242                 if p.chunkIndexDirty(i) {
243                         continue
244                 }
245                 f(i)
246         }
247 }
248
249 func (p *Piece) requestIndexOffset() RequestIndex {
250         return p.t.pieceRequestIndexOffset(p.index)
251 }