]> Sergey Matveev's repositories - btrtrc.git/blob - torrent.go
c74476eb8d7612350f422f47db283586a3801fc4
[btrtrc.git] / torrent.go
1 package torrent
2
3 import (
4         "container/heap"
5         "expvar"
6         "fmt"
7         "io"
8         "log"
9         "math/rand"
10         "net"
11         "sort"
12         "sync"
13         "time"
14
15         "github.com/anacrolix/missinggo"
16         "github.com/anacrolix/missinggo/pubsub"
17         "github.com/bradfitz/iter"
18
19         "github.com/anacrolix/torrent/bencode"
20         "github.com/anacrolix/torrent/metainfo"
21         pp "github.com/anacrolix/torrent/peer_protocol"
22         "github.com/anacrolix/torrent/tracker"
23 )
24
25 func (t *torrent) chunkIndexSpec(chunkIndex, piece int) chunkSpec {
26         return chunkIndexSpec(chunkIndex, t.pieceLength(piece), t.chunkSize)
27 }
28
29 func (t *torrent) pieceNumPendingBytes(index int) (count pp.Integer) {
30         if t.pieceComplete(index) {
31                 return
32         }
33         piece := &t.Pieces[index]
34         count = t.pieceLength(index)
35         if !piece.EverHashed {
36                 return
37         }
38         for i, dirty := range piece.DirtyChunks {
39                 if dirty {
40                         count -= t.chunkIndexSpec(i, index).Length
41                 }
42         }
43         return
44 }
45
46 type peersKey struct {
47         IPBytes string
48         Port    int
49 }
50
51 // Is not aware of Client. Maintains state of torrent for with-in a Client.
52 type torrent struct {
53         stateMu sync.Mutex
54         closing chan struct{}
55
56         // Closed when no more network activity is desired. This includes
57         // announcing, and communicating with peers.
58         ceasingNetworking chan struct{}
59
60         InfoHash InfoHash
61         Pieces   []piece
62         // Values are the piece indices that changed.
63         pieceStateChanges *pubsub.PubSub
64         chunkSize         pp.Integer
65         // Chunks that are wanted before all others. This is for
66         // responsive/streaming readers that want to unblock ASAP.
67         urgent map[request]struct{}
68         // Total length of the torrent in bytes. Stored because it's not O(1) to
69         // get this from the info dict.
70         length int64
71
72         data Data
73
74         // The info dict. Nil if we don't have it (yet).
75         Info *metainfo.Info
76         // Active peer connections, running message stream loops.
77         Conns []*connection
78         // Set of addrs to which we're attempting to connect. Connections are
79         // half-open until all handshakes are completed.
80         HalfOpen map[string]struct{}
81
82         // Reserve of peers to connect to. A peer can be both here and in the
83         // active connections if were told about the peer after connecting with
84         // them. That encourages us to reconnect to peers that are well known.
85         Peers     map[peersKey]Peer
86         wantPeers sync.Cond
87
88         // BEP 12 Multitracker Metadata Extension. The tracker.Client instances
89         // mirror their respective URLs from the announce-list metainfo key.
90         Trackers [][]tracker.Client
91         // Name used if the info name isn't available.
92         displayName string
93         // The bencoded bytes of the info dict.
94         MetaData []byte
95         // Each element corresponds to the 16KiB metadata pieces. If true, we have
96         // received that piece.
97         metadataHave []bool
98
99         // Closed when .Info is set.
100         gotMetainfo chan struct{}
101
102         connPiecePriorites sync.Pool
103 }
104
105 var (
106         piecePrioritiesReused = expvar.NewInt("piecePrioritiesReused")
107         piecePrioritiesNew    = expvar.NewInt("piecePrioritiesNew")
108 )
109
110 func (t *torrent) setDisplayName(dn string) {
111         t.displayName = dn
112 }
113
114 func (t *torrent) newConnPiecePriorities() []int {
115         _ret := t.connPiecePriorites.Get()
116         if _ret != nil {
117                 piecePrioritiesReused.Add(1)
118                 return _ret.([]int)
119         }
120         piecePrioritiesNew.Add(1)
121         return rand.Perm(t.numPieces())
122 }
123
124 func (t *torrent) pieceComplete(piece int) bool {
125         // TODO: This is called when setting metadata, and before storage is
126         // assigned, which doesn't seem right.
127         return t.data != nil && t.data.PieceComplete(piece)
128 }
129
130 func (t *torrent) numConnsUnchoked() (num int) {
131         for _, c := range t.Conns {
132                 if !c.PeerChoked {
133                         num++
134                 }
135         }
136         return
137 }
138
139 // There's a connection to that address already.
140 func (t *torrent) addrActive(addr string) bool {
141         if _, ok := t.HalfOpen[addr]; ok {
142                 return true
143         }
144         for _, c := range t.Conns {
145                 if c.remoteAddr().String() == addr {
146                         return true
147                 }
148         }
149         return false
150 }
151
152 func (t *torrent) worstConns(cl *Client) (wcs *worstConns) {
153         wcs = &worstConns{
154                 c:  make([]*connection, 0, len(t.Conns)),
155                 t:  t,
156                 cl: cl,
157         }
158         for _, c := range t.Conns {
159                 select {
160                 case <-c.closing:
161                 default:
162                         wcs.c = append(wcs.c, c)
163                 }
164         }
165         return
166 }
167
168 func (t *torrent) ceaseNetworking() {
169         t.stateMu.Lock()
170         defer t.stateMu.Unlock()
171         select {
172         case <-t.ceasingNetworking:
173                 return
174         default:
175         }
176         close(t.ceasingNetworking)
177         for _, c := range t.Conns {
178                 c.Close()
179         }
180 }
181
182 func (t *torrent) addPeer(p Peer, cl *Client) {
183         cl.openNewConns(t)
184         if len(t.Peers) >= torrentPeersHighWater {
185                 return
186         }
187         key := peersKey{string(p.IP), p.Port}
188         if _, ok := t.Peers[key]; ok {
189                 return
190         }
191         t.Peers[key] = p
192         peersAddedBySource.Add(string(p.Source), 1)
193         cl.openNewConns(t)
194
195 }
196
197 func (t *torrent) invalidateMetadata() {
198         t.MetaData = nil
199         t.metadataHave = nil
200         t.Info = nil
201 }
202
203 func (t *torrent) saveMetadataPiece(index int, data []byte) {
204         if t.haveInfo() {
205                 return
206         }
207         if index >= len(t.metadataHave) {
208                 log.Printf("%s: ignoring metadata piece %d", t, index)
209                 return
210         }
211         copy(t.MetaData[(1<<14)*index:], data)
212         t.metadataHave[index] = true
213 }
214
215 func (t *torrent) metadataPieceCount() int {
216         return (len(t.MetaData) + (1 << 14) - 1) / (1 << 14)
217 }
218
219 func (t *torrent) haveMetadataPiece(piece int) bool {
220         if t.haveInfo() {
221                 return (1<<14)*piece < len(t.MetaData)
222         } else {
223                 return piece < len(t.metadataHave) && t.metadataHave[piece]
224         }
225 }
226
227 func (t *torrent) metadataSizeKnown() bool {
228         return t.MetaData != nil
229 }
230
231 func (t *torrent) metadataSize() int {
232         return len(t.MetaData)
233 }
234
235 func infoPieceHashes(info *metainfo.Info) (ret []string) {
236         for i := 0; i < len(info.Pieces); i += 20 {
237                 ret = append(ret, string(info.Pieces[i:i+20]))
238         }
239         return
240 }
241
242 // Called when metadata for a torrent becomes available.
243 func (t *torrent) setMetadata(md *metainfo.Info, infoBytes []byte) (err error) {
244         err = validateInfo(md)
245         if err != nil {
246                 err = fmt.Errorf("bad info: %s", err)
247                 return
248         }
249         t.Info = md
250         t.length = 0
251         for _, f := range t.Info.UpvertedFiles() {
252                 t.length += f.Length
253         }
254         t.MetaData = infoBytes
255         t.metadataHave = nil
256         hashes := infoPieceHashes(md)
257         t.Pieces = make([]piece, len(hashes))
258         for i, hash := range hashes {
259                 piece := &t.Pieces[i]
260                 piece.noPendingWrites.L = &piece.pendingWritesMutex
261                 missinggo.CopyExact(piece.Hash[:], hash)
262         }
263         for _, conn := range t.Conns {
264                 t.initRequestOrdering(conn)
265                 if err := conn.setNumPieces(t.numPieces()); err != nil {
266                         log.Printf("closing connection: %s", err)
267                         conn.Close()
268                 }
269         }
270         return
271 }
272
273 func (t *torrent) setStorage(td Data) (err error) {
274         if t.data != nil {
275                 t.data.Close()
276         }
277         t.data = td
278         return
279 }
280
281 func (t *torrent) haveAllMetadataPieces() bool {
282         if t.haveInfo() {
283                 return true
284         }
285         if t.metadataHave == nil {
286                 return false
287         }
288         for _, have := range t.metadataHave {
289                 if !have {
290                         return false
291                 }
292         }
293         return true
294 }
295
296 func (t *torrent) setMetadataSize(bytes int64, cl *Client) {
297         if t.haveInfo() {
298                 // We already know the correct metadata size.
299                 return
300         }
301         if bytes <= 0 || bytes > 10000000 { // 10MB, pulled from my ass.
302                 log.Printf("received bad metadata size: %d", bytes)
303                 return
304         }
305         if t.MetaData != nil && len(t.MetaData) == int(bytes) {
306                 return
307         }
308         t.MetaData = make([]byte, bytes)
309         t.metadataHave = make([]bool, (bytes+(1<<14)-1)/(1<<14))
310         for _, c := range t.Conns {
311                 cl.requestPendingMetadata(t, c)
312         }
313
314 }
315
316 // The current working name for the torrent. Either the name in the info dict,
317 // or a display name given such as by the dn value in a magnet link, or "".
318 func (t *torrent) Name() string {
319         if t.haveInfo() {
320                 return t.Info.Name
321         }
322         return t.displayName
323 }
324
325 func (t *torrent) pieceState(index int) (ret PieceState) {
326         p := &t.Pieces[index]
327         ret.Priority = p.Priority
328         if t.pieceComplete(index) {
329                 ret.Complete = true
330         }
331         if p.QueuedForHash || p.Hashing {
332                 ret.Checking = true
333         }
334         if !ret.Complete && t.piecePartiallyDownloaded(index) {
335                 ret.Partial = true
336         }
337         return
338 }
339
340 func (t *torrent) metadataPieceSize(piece int) int {
341         return metadataPieceSize(len(t.MetaData), piece)
342 }
343
344 func (t *torrent) newMetadataExtensionMessage(c *connection, msgType int, piece int, data []byte) pp.Message {
345         d := map[string]int{
346                 "msg_type": msgType,
347                 "piece":    piece,
348         }
349         if data != nil {
350                 d["total_size"] = len(t.MetaData)
351         }
352         p, err := bencode.Marshal(d)
353         if err != nil {
354                 panic(err)
355         }
356         return pp.Message{
357                 Type:            pp.Extended,
358                 ExtendedID:      byte(c.PeerExtensionIDs["ut_metadata"]),
359                 ExtendedPayload: append(p, data...),
360         }
361 }
362
363 func (t *torrent) pieceStateRuns() (ret []PieceStateRun) {
364         rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
365                 ret = append(ret, PieceStateRun{
366                         PieceState: el.(PieceState),
367                         Length:     int(count),
368                 })
369         })
370         for index := range t.Pieces {
371                 rle.Append(t.pieceState(index), 1)
372         }
373         rle.Flush()
374         return
375 }
376
377 // Produces a small string representing a PieceStateRun.
378 func pieceStateRunStatusChars(psr PieceStateRun) (ret string) {
379         ret = fmt.Sprintf("%d", psr.Length)
380         ret += func() string {
381                 switch psr.Priority {
382                 case PiecePriorityNext:
383                         return "N"
384                 case PiecePriorityNormal:
385                         return "."
386                 case PiecePriorityReadahead:
387                         return "R"
388                 case PiecePriorityNow:
389                         return "!"
390                 default:
391                         return ""
392                 }
393         }()
394         if psr.Checking {
395                 ret += "H"
396         }
397         if psr.Partial {
398                 ret += "P"
399         }
400         if psr.Complete {
401                 ret += "C"
402         }
403         return
404 }
405
406 func (t *torrent) writeStatus(w io.Writer, cl *Client) {
407         fmt.Fprintf(w, "Infohash: %x\n", t.InfoHash)
408         fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
409         if !t.haveInfo() {
410                 fmt.Fprintf(w, "Metadata have: ")
411                 for _, h := range t.metadataHave {
412                         fmt.Fprintf(w, "%c", func() rune {
413                                 if h {
414                                         return 'H'
415                                 } else {
416                                         return '.'
417                                 }
418                         }())
419                 }
420                 fmt.Fprintln(w)
421         }
422         fmt.Fprintf(w, "Piece length: %s\n", func() string {
423                 if t.haveInfo() {
424                         return fmt.Sprint(t.usualPieceSize())
425                 } else {
426                         return "?"
427                 }
428         }())
429         if t.haveInfo() {
430                 fmt.Fprintf(w, "Num Pieces: %d\n", t.numPieces())
431                 fmt.Fprint(w, "Piece States:")
432                 for _, psr := range t.pieceStateRuns() {
433                         w.Write([]byte(" "))
434                         w.Write([]byte(pieceStateRunStatusChars(psr)))
435                 }
436                 fmt.Fprintln(w)
437         }
438         fmt.Fprintf(w, "Urgent:")
439         for req := range t.urgent {
440                 fmt.Fprintf(w, " %v", req)
441         }
442         fmt.Fprintln(w)
443         fmt.Fprintf(w, "Trackers: ")
444         for _, tier := range t.Trackers {
445                 for _, tr := range tier {
446                         fmt.Fprintf(w, "%q ", tr.String())
447                 }
448         }
449         fmt.Fprintf(w, "\n")
450         fmt.Fprintf(w, "Pending peers: %d\n", len(t.Peers))
451         fmt.Fprintf(w, "Half open: %d\n", len(t.HalfOpen))
452         fmt.Fprintf(w, "Active peers: %d\n", len(t.Conns))
453         sort.Sort(&worstConns{
454                 c:  t.Conns,
455                 t:  t,
456                 cl: cl,
457         })
458         for i, c := range t.Conns {
459                 fmt.Fprintf(w, "%2d. ", i+1)
460                 c.WriteStatus(w, t)
461         }
462 }
463
464 func (t *torrent) String() string {
465         s := t.Name()
466         if s == "" {
467                 s = fmt.Sprintf("%x", t.InfoHash)
468         }
469         return s
470 }
471
472 func (t *torrent) haveInfo() bool {
473         return t != nil && t.Info != nil
474 }
475
476 // TODO: Include URIs that weren't converted to tracker clients.
477 func (t *torrent) announceList() (al [][]string) {
478         for _, tier := range t.Trackers {
479                 var l []string
480                 for _, tr := range tier {
481                         l = append(l, tr.URL())
482                 }
483                 al = append(al, l)
484         }
485         return
486 }
487
488 // Returns a run-time generated MetaInfo that includes the info bytes and
489 // announce-list as currently known to the client.
490 func (t *torrent) MetaInfo() *metainfo.MetaInfo {
491         if t.MetaData == nil {
492                 panic("info bytes not set")
493         }
494         return &metainfo.MetaInfo{
495                 Info: metainfo.InfoEx{
496                         Info:  *t.Info,
497                         Bytes: t.MetaData,
498                 },
499                 CreationDate: time.Now().Unix(),
500                 Comment:      "dynamic metainfo from client",
501                 CreatedBy:    "go.torrent",
502                 AnnounceList: t.announceList(),
503         }
504 }
505
506 func (t *torrent) bytesLeft() (left int64) {
507         if !t.haveInfo() {
508                 return -1
509         }
510         for i := 0; i < t.numPieces(); i++ {
511                 left += int64(t.pieceNumPendingBytes(i))
512         }
513         return
514 }
515
516 func (t *torrent) piecePartiallyDownloaded(index int) bool {
517         pendingBytes := t.pieceNumPendingBytes(index)
518         return pendingBytes != 0 && pendingBytes != t.pieceLength(index)
519 }
520
521 func numChunksForPiece(chunkSize int, pieceSize int) int {
522         return (pieceSize + chunkSize - 1) / chunkSize
523 }
524
525 func (t *torrent) usualPieceSize() int {
526         return int(t.Info.PieceLength)
527 }
528
529 func (t *torrent) lastPieceSize() int {
530         return int(t.pieceLength(t.numPieces() - 1))
531 }
532
533 func (t *torrent) numPieces() int {
534         return t.Info.NumPieces()
535 }
536
537 func (t *torrent) numPiecesCompleted() (num int) {
538         for i := range iter.N(t.Info.NumPieces()) {
539                 if t.pieceComplete(i) {
540                         num++
541                 }
542         }
543         return
544 }
545
546 func (t *torrent) isClosed() bool {
547         select {
548         case <-t.closing:
549                 return true
550         default:
551                 return false
552         }
553 }
554
555 func (t *torrent) close() (err error) {
556         if t.isClosed() {
557                 return
558         }
559         t.ceaseNetworking()
560         close(t.closing)
561         if c, ok := t.data.(io.Closer); ok {
562                 c.Close()
563         }
564         for _, conn := range t.Conns {
565                 conn.Close()
566         }
567         t.pieceStateChanges.Close()
568         return
569 }
570
571 func (t *torrent) requestOffset(r request) int64 {
572         return torrentRequestOffset(t.length, int64(t.usualPieceSize()), r)
573 }
574
575 // Return the request that would include the given offset into the torrent
576 // data. Returns !ok if there is no such request.
577 func (t *torrent) offsetRequest(off int64) (req request, ok bool) {
578         return torrentOffsetRequest(t.length, t.Info.PieceLength, int64(t.chunkSize), off)
579 }
580
581 func (t *torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
582         n, err := t.data.WriteAt(data, int64(piece)*t.Info.PieceLength+begin)
583         if err == nil && n != len(data) {
584                 err = io.ErrShortWrite
585         }
586         return
587 }
588
589 func (t *torrent) bitfield() (bf []bool) {
590         for i := range t.Pieces {
591                 bf = append(bf, t.havePiece(i))
592         }
593         return
594 }
595
596 func (t *torrent) validOutgoingRequest(r request) bool {
597         if r.Index >= pp.Integer(t.Info.NumPieces()) {
598                 return false
599         }
600         if r.Begin%t.chunkSize != 0 {
601                 return false
602         }
603         if r.Length > t.chunkSize {
604                 return false
605         }
606         pieceLength := t.pieceLength(int(r.Index))
607         if r.Begin+r.Length > pieceLength {
608                 return false
609         }
610         return r.Length == t.chunkSize || r.Begin+r.Length == pieceLength
611 }
612
613 func (t *torrent) pieceChunks(piece int) (css []chunkSpec) {
614         css = make([]chunkSpec, 0, (t.pieceLength(piece)+t.chunkSize-1)/t.chunkSize)
615         var cs chunkSpec
616         for left := t.pieceLength(piece); left != 0; left -= cs.Length {
617                 cs.Length = left
618                 if cs.Length > t.chunkSize {
619                         cs.Length = t.chunkSize
620                 }
621                 css = append(css, cs)
622                 cs.Begin += cs.Length
623         }
624         return
625 }
626
627 func (t *torrent) pieceNumChunks(piece int) int {
628         return int((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize)
629 }
630
631 func (t *torrent) pendAllChunkSpecs(pieceIndex int) {
632         t.Pieces[pieceIndex].DirtyChunks = nil
633 }
634
635 type Peer struct {
636         Id     [20]byte
637         IP     net.IP
638         Port   int
639         Source peerSource
640         // Peer is known to support encryption.
641         SupportsEncryption bool
642 }
643
644 func (t *torrent) pieceLength(piece int) (len_ pp.Integer) {
645         if piece < 0 || piece > t.Info.NumPieces() {
646                 return
647         }
648         if int(piece) == t.numPieces()-1 {
649                 len_ = pp.Integer(t.length % t.Info.PieceLength)
650         }
651         if len_ == 0 {
652                 len_ = pp.Integer(t.Info.PieceLength)
653         }
654         return
655 }
656
657 func (t *torrent) hashPiece(piece int) (ps pieceSum) {
658         hash := pieceHash.New()
659         p := &t.Pieces[piece]
660         p.pendingWritesMutex.Lock()
661         for p.pendingWrites != 0 {
662                 p.noPendingWrites.Wait()
663         }
664         p.pendingWritesMutex.Unlock()
665         pl := t.Info.Piece(int(piece)).Length()
666         n, err := t.data.WriteSectionTo(hash, int64(piece)*t.Info.PieceLength, pl)
667         if err != nil {
668                 if err != io.ErrUnexpectedEOF {
669                         log.Printf("error hashing piece with %T: %s", t.data, err)
670                 }
671                 return
672         }
673         if n != pl {
674                 panic(fmt.Sprintf("%T: %d != %d", t.data, n, pl))
675         }
676         missinggo.CopyExact(ps[:], hash.Sum(nil))
677         return
678 }
679
680 func (t *torrent) haveAllPieces() bool {
681         if !t.haveInfo() {
682                 return false
683         }
684         for i := range t.Pieces {
685                 if !t.pieceComplete(i) {
686                         return false
687                 }
688         }
689         return true
690 }
691
692 func (me *torrent) haveAnyPieces() bool {
693         for i := range me.Pieces {
694                 if me.pieceComplete(i) {
695                         return true
696                 }
697         }
698         return false
699 }
700
701 func (t *torrent) havePiece(index int) bool {
702         return t.haveInfo() && t.pieceComplete(index)
703 }
704
705 func (t *torrent) haveChunk(r request) (ret bool) {
706         // defer func() {
707         //      log.Println("have chunk", r, ret)
708         // }()
709         if !t.haveInfo() {
710                 return false
711         }
712         if t.pieceComplete(int(r.Index)) {
713                 return true
714         }
715         p := &t.Pieces[r.Index]
716         return !p.pendingChunk(r.chunkSpec, t.chunkSize)
717 }
718
719 func chunkIndex(cs chunkSpec, chunkSize pp.Integer) int {
720         return int(cs.Begin / chunkSize)
721 }
722
723 // TODO: This should probably be called wantPiece.
724 func (t *torrent) wantChunk(r request) bool {
725         if !t.wantPiece(int(r.Index)) {
726                 return false
727         }
728         if t.Pieces[r.Index].pendingChunk(r.chunkSpec, t.chunkSize) {
729                 return true
730         }
731         _, ok := t.urgent[r]
732         return ok
733 }
734
735 func (t *torrent) urgentChunkInPiece(piece int) bool {
736         p := pp.Integer(piece)
737         for req := range t.urgent {
738                 if req.Index == p {
739                         return true
740                 }
741         }
742         return false
743 }
744
745 // TODO: This should be called wantPieceIndex.
746 func (t *torrent) wantPiece(index int) bool {
747         if !t.haveInfo() {
748                 return false
749         }
750         p := &t.Pieces[index]
751         if p.QueuedForHash {
752                 return false
753         }
754         if p.Hashing {
755                 return false
756         }
757         if p.Priority == PiecePriorityNone {
758                 if !t.urgentChunkInPiece(index) {
759                         return false
760                 }
761         }
762         // Put piece complete check last, since it's the slowest as it can involve
763         // calling out into external data stores.
764         return !t.pieceComplete(index)
765 }
766
767 func (t *torrent) connHasWantedPieces(c *connection) bool {
768         return c.pieceRequestOrder != nil && !c.pieceRequestOrder.Empty()
769 }
770
771 func (t *torrent) extentPieces(off, _len int64) (pieces []int) {
772         for i := off / int64(t.usualPieceSize()); i*int64(t.usualPieceSize()) < off+_len; i++ {
773                 pieces = append(pieces, int(i))
774         }
775         return
776 }
777
778 func (t *torrent) worstBadConn(cl *Client) *connection {
779         wcs := t.worstConns(cl)
780         heap.Init(wcs)
781         for wcs.Len() != 0 {
782                 c := heap.Pop(wcs).(*connection)
783                 if c.UnwantedChunksReceived >= 6 && c.UnwantedChunksReceived > c.UsefulChunksReceived {
784                         return c
785                 }
786                 if wcs.Len() >= (socketsPerTorrent+1)/2 {
787                         // Give connections 1 minute to prove themselves.
788                         if time.Since(c.completedHandshake) > time.Minute {
789                                 return c
790                         }
791                 }
792         }
793         return nil
794 }
795
796 func (t *torrent) publishPieceChange(piece int) {
797         cur := t.pieceState(piece)
798         p := &t.Pieces[piece]
799         if cur != p.PublicPieceState {
800                 t.pieceStateChanges.Publish(piece)
801         }
802         p.PublicPieceState = cur
803 }
804
805 func (t *torrent) pieceNumPendingChunks(piece int) int {
806         return t.pieceNumChunks(piece) - t.Pieces[piece].numDirtyChunks()
807 }
808
809 func (t *torrent) pieceAllDirty(piece int) bool {
810         p := &t.Pieces[piece]
811         if len(p.DirtyChunks) != t.pieceNumChunks(piece) {
812                 return false
813         }
814         for _, dirty := range p.DirtyChunks {
815                 if !dirty {
816                         return false
817                 }
818         }
819         return true
820 }