]> Sergey Matveev's repositories - btrtrc.git/blob - torrent.go
3374778b2c12bedd0b362fdc691faf14a896a846
[btrtrc.git] / torrent.go
1 package torrent
2
3 import (
4         "container/heap"
5         "crypto/sha1"
6         "errors"
7         "fmt"
8         "io"
9         "log"
10         "math"
11         "math/rand"
12         "net"
13         "os"
14         "strconv"
15         "sync"
16         "text/tabwriter"
17         "time"
18
19         "github.com/anacrolix/dht"
20         "github.com/anacrolix/missinggo"
21         "github.com/anacrolix/missinggo/bitmap"
22         "github.com/anacrolix/missinggo/perf"
23         "github.com/anacrolix/missinggo/pubsub"
24         "github.com/anacrolix/missinggo/slices"
25         "github.com/bradfitz/iter"
26
27         "github.com/anacrolix/torrent/bencode"
28         "github.com/anacrolix/torrent/metainfo"
29         pp "github.com/anacrolix/torrent/peer_protocol"
30         "github.com/anacrolix/torrent/storage"
31         "github.com/anacrolix/torrent/tracker"
32 )
33
34 func (t *Torrent) chunkIndexSpec(chunkIndex, piece int) chunkSpec {
35         return chunkIndexSpec(chunkIndex, t.pieceLength(piece), t.chunkSize)
36 }
37
38 type peersKey struct {
39         IPBytes string
40         Port    int
41 }
42
43 // Maintains state of torrent within a Client.
44 type Torrent struct {
45         cl *Client
46
47         networkingEnabled bool
48         requestStrategy   int
49
50         closed   missinggo.Event
51         infoHash metainfo.Hash
52         pieces   []Piece
53         // Values are the piece indices that changed.
54         pieceStateChanges *pubsub.PubSub
55         // The size of chunks to request from peers over the wire. This is
56         // normally 16KiB by convention these days.
57         chunkSize pp.Integer
58         chunkPool *sync.Pool
59         // Total length of the torrent in bytes. Stored because it's not O(1) to
60         // get this from the info dict.
61         length int64
62
63         // The storage to open when the info dict becomes available.
64         storageOpener *storage.Client
65         // Storage for torrent data.
66         storage *storage.Torrent
67
68         metainfo metainfo.MetaInfo
69
70         // The info dict. nil if we don't have it (yet).
71         info *metainfo.Info
72
73         // Active peer connections, running message stream loops.
74         conns               map[*connection]struct{}
75         maxEstablishedConns int
76         // Set of addrs to which we're attempting to connect. Connections are
77         // half-open until all handshakes are completed.
78         halfOpen    map[string]Peer
79         fastestConn *connection
80
81         // Reserve of peers to connect to. A peer can be both here and in the
82         // active connections if were told about the peer after connecting with
83         // them. That encourages us to reconnect to peers that are well known in
84         // the swarm.
85         peers          map[peersKey]Peer
86         wantPeersEvent missinggo.Event
87         // An announcer for each tracker URL.
88         trackerAnnouncers map[string]*trackerScraper
89         // How many times we've initiated a DHT announce. TODO: Move into stats.
90         numDHTAnnounces int
91
92         // Name used if the info name isn't available. Should be cleared when the
93         // Info does become available.
94         displayName string
95         // The bencoded bytes of the info dict. This is actively manipulated if
96         // the info bytes aren't initially available, and we try to fetch them
97         // from peers.
98         metadataBytes []byte
99         // Each element corresponds to the 16KiB metadata pieces. If true, we have
100         // received that piece.
101         metadataCompletedChunks []bool
102
103         // Set when .Info is obtained.
104         gotMetainfo missinggo.Event
105
106         readers               map[*Reader]struct{}
107         readerNowPieces       bitmap.Bitmap
108         readerReadaheadPieces bitmap.Bitmap
109
110         // The indexes of pieces we want with normal priority, that aren't
111         // currently available.
112         pendingPieces bitmap.Bitmap
113         // A cache of completed piece indices.
114         completedPieces bitmap.Bitmap
115
116         // A pool of piece priorities []int for assignment to new connections.
117         // These "inclinations" are used to give connections preference for
118         // different pieces.
119         connPieceInclinationPool sync.Pool
120         // Torrent-level statistics.
121         stats TorrentStats
122 }
123
124 // Returns a channel that is closed when the Torrent is closed.
125 func (t *Torrent) Closed() <-chan struct{} {
126         return t.closed.LockedChan(&t.cl.mu)
127 }
128
129 // KnownSwarm returns the known subset of the peers in the Torrent's swarm, including active,
130 // pending, and half-open peers.
131 func (t *Torrent) KnownSwarm() (ks []Peer) {
132         // Add pending peers to the list
133         for _, peer := range t.peers {
134                 ks = append(ks, peer)
135         }
136
137         // Add half-open peers to the list
138         for _, peer := range t.halfOpen {
139                 ks = append(ks, peer)
140         }
141
142         // Add active peers to the list
143         for conn := range t.conns {
144                 host, portString, err := net.SplitHostPort(conn.remoteAddr().String())
145                 if err != nil {
146                         panic(err)
147                 }
148
149                 ip := net.ParseIP(host)
150                 port, err := strconv.Atoi(portString)
151                 if err != nil {
152                         panic(err)
153                 }
154
155                 ks = append(ks, Peer{
156                         Id:     conn.PeerID,
157                         IP:     ip,
158                         Port:   port,
159                         Source: conn.Discovery,
160                         // > If the connection is encrypted, that's certainly enough to set SupportsEncryption.
161                         // > But if we're not connected to them with an encrypted connection, I couldn't say
162                         // > what's appropriate. We can carry forward the SupportsEncryption value as we
163                         // > received it from trackers/DHT/PEX, or just use the encryption state for the
164                         // > connection. It's probably easiest to do the latter for now.
165                         // https://github.com/anacrolix/torrent/pull/188
166                         SupportsEncryption: conn.headerEncrypted,
167                 })
168         }
169
170         return
171 }
172
173 func (t *Torrent) setChunkSize(size pp.Integer) {
174         t.chunkSize = size
175         t.chunkPool = &sync.Pool{
176                 New: func() interface{} {
177                         return make([]byte, size)
178                 },
179         }
180 }
181
182 func (t *Torrent) setDisplayName(dn string) {
183         if t.haveInfo() {
184                 return
185         }
186         t.displayName = dn
187 }
188
189 func (t *Torrent) pieceComplete(piece int) bool {
190         return t.completedPieces.Get(piece)
191 }
192
193 func (t *Torrent) pieceCompleteUncached(piece int) bool {
194         return t.pieces[piece].Storage().GetIsComplete()
195 }
196
197 // There's a connection to that address already.
198 func (t *Torrent) addrActive(addr string) bool {
199         if _, ok := t.halfOpen[addr]; ok {
200                 return true
201         }
202         for c := range t.conns {
203                 if c.remoteAddr().String() == addr {
204                         return true
205                 }
206         }
207         return false
208 }
209
210 func (t *Torrent) unclosedConnsAsSlice() (ret []*connection) {
211         ret = make([]*connection, 0, len(t.conns))
212         for c := range t.conns {
213                 if !c.closed.IsSet() {
214                         ret = append(ret, c)
215                 }
216         }
217         return
218 }
219
220 func (t *Torrent) addPeer(p Peer) {
221         cl := t.cl
222         cl.openNewConns(t)
223         if len(t.peers) >= torrentPeersHighWater {
224                 return
225         }
226         key := peersKey{string(p.IP), p.Port}
227         if _, ok := t.peers[key]; ok {
228                 return
229         }
230         t.peers[key] = p
231         peersAddedBySource.Add(string(p.Source), 1)
232         cl.openNewConns(t)
233
234 }
235
236 func (t *Torrent) invalidateMetadata() {
237         for i := range t.metadataCompletedChunks {
238                 t.metadataCompletedChunks[i] = false
239         }
240         t.info = nil
241 }
242
243 func (t *Torrent) saveMetadataPiece(index int, data []byte) {
244         if t.haveInfo() {
245                 return
246         }
247         if index >= len(t.metadataCompletedChunks) {
248                 log.Printf("%s: ignoring metadata piece %d", t, index)
249                 return
250         }
251         copy(t.metadataBytes[(1<<14)*index:], data)
252         t.metadataCompletedChunks[index] = true
253 }
254
255 func (t *Torrent) metadataPieceCount() int {
256         return (len(t.metadataBytes) + (1 << 14) - 1) / (1 << 14)
257 }
258
259 func (t *Torrent) haveMetadataPiece(piece int) bool {
260         if t.haveInfo() {
261                 return (1<<14)*piece < len(t.metadataBytes)
262         } else {
263                 return piece < len(t.metadataCompletedChunks) && t.metadataCompletedChunks[piece]
264         }
265 }
266
267 func (t *Torrent) metadataSizeKnown() bool {
268         return t.metadataBytes != nil
269 }
270
271 func (t *Torrent) metadataSize() int {
272         return len(t.metadataBytes)
273 }
274
275 func infoPieceHashes(info *metainfo.Info) (ret []string) {
276         for i := 0; i < len(info.Pieces); i += sha1.Size {
277                 ret = append(ret, string(info.Pieces[i:i+sha1.Size]))
278         }
279         return
280 }
281
282 func (t *Torrent) makePieces() {
283         hashes := infoPieceHashes(t.info)
284         t.pieces = make([]Piece, len(hashes))
285         for i, hash := range hashes {
286                 piece := &t.pieces[i]
287                 piece.t = t
288                 piece.index = i
289                 piece.noPendingWrites.L = &piece.pendingWritesMutex
290                 missinggo.CopyExact(piece.hash[:], hash)
291         }
292 }
293
294 // Called when metadata for a torrent becomes available.
295 func (t *Torrent) setInfoBytes(b []byte) error {
296         if t.haveInfo() {
297                 return nil
298         }
299         if metainfo.HashBytes(b) != t.infoHash {
300                 return errors.New("info bytes have wrong hash")
301         }
302         var info metainfo.Info
303         err := bencode.Unmarshal(b, &info)
304         if err != nil {
305                 return fmt.Errorf("error unmarshalling info bytes: %s", err)
306         }
307         err = validateInfo(&info)
308         if err != nil {
309                 return fmt.Errorf("bad info: %s", err)
310         }
311         defer t.updateWantPeersEvent()
312         t.info = &info
313         t.displayName = "" // Save a few bytes lol.
314         t.cl.event.Broadcast()
315         t.gotMetainfo.Set()
316         t.storage, err = t.storageOpener.OpenTorrent(t.info, t.infoHash)
317         if err != nil {
318                 return fmt.Errorf("error opening torrent storage: %s", err)
319         }
320         t.length = 0
321         for _, f := range t.info.UpvertedFiles() {
322                 t.length += f.Length
323         }
324         t.metadataBytes = b
325         t.metadataCompletedChunks = nil
326         t.makePieces()
327         for conn := range t.conns {
328                 if err := conn.setNumPieces(t.numPieces()); err != nil {
329                         log.Printf("closing connection: %s", err)
330                         conn.Close()
331                 }
332         }
333         for i := range t.pieces {
334                 t.updatePieceCompletion(i)
335                 // t.pieces[i].QueuedForHash = true
336         }
337         // go func() {
338         //      for i := range t.pieces {
339         //              t.verifyPiece(i)
340         //      }
341         // }()
342         return nil
343 }
344
345 func (t *Torrent) haveAllMetadataPieces() bool {
346         if t.haveInfo() {
347                 return true
348         }
349         if t.metadataCompletedChunks == nil {
350                 return false
351         }
352         for _, have := range t.metadataCompletedChunks {
353                 if !have {
354                         return false
355                 }
356         }
357         return true
358 }
359
360 // TODO: Propagate errors to disconnect peer.
361 func (t *Torrent) setMetadataSize(bytes int64) (err error) {
362         if t.haveInfo() {
363                 // We already know the correct metadata size.
364                 return
365         }
366         if bytes <= 0 || bytes > 10000000 { // 10MB, pulled from my ass.
367                 return errors.New("bad size")
368         }
369         if t.metadataBytes != nil && len(t.metadataBytes) == int(bytes) {
370                 return
371         }
372         t.metadataBytes = make([]byte, bytes)
373         t.metadataCompletedChunks = make([]bool, (bytes+(1<<14)-1)/(1<<14))
374         for c := range t.conns {
375                 c.requestPendingMetadata()
376         }
377         return
378 }
379
380 // The current working name for the torrent. Either the name in the info dict,
381 // or a display name given such as by the dn value in a magnet link, or "".
382 func (t *Torrent) name() string {
383         if t.haveInfo() {
384                 return t.info.Name
385         }
386         return t.displayName
387 }
388
389 func (t *Torrent) pieceState(index int) (ret PieceState) {
390         p := &t.pieces[index]
391         ret.Priority = t.piecePriority(index)
392         if t.pieceComplete(index) {
393                 ret.Complete = true
394         }
395         if p.queuedForHash || p.hashing {
396                 ret.Checking = true
397         }
398         if !ret.Complete && t.piecePartiallyDownloaded(index) {
399                 ret.Partial = true
400         }
401         return
402 }
403
404 func (t *Torrent) metadataPieceSize(piece int) int {
405         return metadataPieceSize(len(t.metadataBytes), piece)
406 }
407
408 func (t *Torrent) newMetadataExtensionMessage(c *connection, msgType int, piece int, data []byte) pp.Message {
409         d := map[string]int{
410                 "msg_type": msgType,
411                 "piece":    piece,
412         }
413         if data != nil {
414                 d["total_size"] = len(t.metadataBytes)
415         }
416         p, err := bencode.Marshal(d)
417         if err != nil {
418                 panic(err)
419         }
420         return pp.Message{
421                 Type:            pp.Extended,
422                 ExtendedID:      c.PeerExtensionIDs["ut_metadata"],
423                 ExtendedPayload: append(p, data...),
424         }
425 }
426
427 func (t *Torrent) pieceStateRuns() (ret []PieceStateRun) {
428         rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
429                 ret = append(ret, PieceStateRun{
430                         PieceState: el.(PieceState),
431                         Length:     int(count),
432                 })
433         })
434         for index := range t.pieces {
435                 rle.Append(t.pieceState(index), 1)
436         }
437         rle.Flush()
438         return
439 }
440
441 // Produces a small string representing a PieceStateRun.
442 func pieceStateRunStatusChars(psr PieceStateRun) (ret string) {
443         ret = fmt.Sprintf("%d", psr.Length)
444         ret += func() string {
445                 switch psr.Priority {
446                 case PiecePriorityNext:
447                         return "N"
448                 case PiecePriorityNormal:
449                         return "."
450                 case PiecePriorityReadahead:
451                         return "R"
452                 case PiecePriorityNow:
453                         return "!"
454                 default:
455                         return ""
456                 }
457         }()
458         if psr.Checking {
459                 ret += "H"
460         }
461         if psr.Partial {
462                 ret += "P"
463         }
464         if psr.Complete {
465                 ret += "C"
466         }
467         return
468 }
469
470 func (t *Torrent) writeStatus(w io.Writer) {
471         fmt.Fprintf(w, "Infohash: %s\n", t.infoHash.HexString())
472         fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
473         if !t.haveInfo() {
474                 fmt.Fprintf(w, "Metadata have: ")
475                 for _, h := range t.metadataCompletedChunks {
476                         fmt.Fprintf(w, "%c", func() rune {
477                                 if h {
478                                         return 'H'
479                                 } else {
480                                         return '.'
481                                 }
482                         }())
483                 }
484                 fmt.Fprintln(w)
485         }
486         fmt.Fprintf(w, "Piece length: %s\n", func() string {
487                 if t.haveInfo() {
488                         return fmt.Sprint(t.usualPieceSize())
489                 } else {
490                         return "?"
491                 }
492         }())
493         if t.info != nil {
494                 fmt.Fprintf(w, "Num Pieces: %d\n", t.numPieces())
495                 fmt.Fprint(w, "Piece States:")
496                 for _, psr := range t.pieceStateRuns() {
497                         w.Write([]byte(" "))
498                         w.Write([]byte(pieceStateRunStatusChars(psr)))
499                 }
500                 fmt.Fprintln(w)
501         }
502         fmt.Fprintf(w, "Reader Pieces:")
503         t.forReaderOffsetPieces(func(begin, end int) (again bool) {
504                 fmt.Fprintf(w, " %d:%d", begin, end)
505                 return true
506         })
507         fmt.Fprintln(w)
508
509         fmt.Fprintf(w, "Trackers:\n")
510         func() {
511                 tw := tabwriter.NewWriter(w, 0, 0, 2, ' ', 0)
512                 fmt.Fprintf(tw, "    URL\tNext announce\tLast announce\n")
513                 for _, ta := range slices.Sort(slices.FromMapElems(t.trackerAnnouncers), func(l, r *trackerScraper) bool {
514                         return l.url < r.url
515                 }).([]*trackerScraper) {
516                         fmt.Fprintf(tw, "    %s\n", ta.statusLine())
517                 }
518                 tw.Flush()
519         }()
520
521         fmt.Fprintf(w, "DHT Announces: %d\n", t.numDHTAnnounces)
522
523         fmt.Fprintf(w, "Pending peers: %d\n", len(t.peers))
524         fmt.Fprintf(w, "Half open: %d\n", len(t.halfOpen))
525         fmt.Fprintf(w, "Active peers: %d\n", len(t.conns))
526         conns := t.connsAsSlice()
527         slices.Sort(conns, worseConn)
528         for i, c := range conns {
529                 fmt.Fprintf(w, "%2d. ", i+1)
530                 c.WriteStatus(w, t)
531         }
532 }
533
534 func (t *Torrent) haveInfo() bool {
535         return t.info != nil
536 }
537
538 // Returns a run-time generated MetaInfo that includes the info bytes and
539 // announce-list as currently known to the client.
540 func (t *Torrent) newMetaInfo() metainfo.MetaInfo {
541         return metainfo.MetaInfo{
542                 CreationDate: time.Now().Unix(),
543                 Comment:      "dynamic metainfo from client",
544                 CreatedBy:    "go.torrent",
545                 AnnounceList: t.metainfo.UpvertedAnnounceList(),
546                 InfoBytes:    t.metadataBytes,
547         }
548 }
549
550 func (t *Torrent) BytesMissing() int64 {
551         t.mu().RLock()
552         defer t.mu().RUnlock()
553         return t.bytesMissingLocked()
554 }
555
556 func (t *Torrent) bytesMissingLocked() int64 {
557         return t.bytesLeft()
558 }
559
560 func (t *Torrent) bytesLeft() (left int64) {
561         bitmap.Flip(t.completedPieces, 0, t.numPieces()).IterTyped(func(piece int) bool {
562                 p := t.pieces[piece]
563                 left += int64(p.length() - p.numDirtyBytes())
564                 return true
565         })
566         return
567 }
568
569 // Bytes left to give in tracker announces.
570 func (t *Torrent) bytesLeftAnnounce() uint64 {
571         if t.haveInfo() {
572                 return uint64(t.bytesLeft())
573         } else {
574                 return math.MaxUint64
575         }
576 }
577
578 func (t *Torrent) piecePartiallyDownloaded(piece int) bool {
579         if t.pieceComplete(piece) {
580                 return false
581         }
582         if t.pieceAllDirty(piece) {
583                 return false
584         }
585         return t.pieces[piece].hasDirtyChunks()
586 }
587
588 func (t *Torrent) usualPieceSize() int {
589         return int(t.info.PieceLength)
590 }
591
592 func (t *Torrent) numPieces() int {
593         return t.info.NumPieces()
594 }
595
596 func (t *Torrent) numPiecesCompleted() (num int) {
597         return t.completedPieces.Len()
598 }
599
600 func (t *Torrent) close() (err error) {
601         t.closed.Set()
602         if t.storage != nil {
603                 t.storage.Close()
604         }
605         for conn := range t.conns {
606                 conn.Close()
607         }
608         t.cl.event.Broadcast()
609         t.pieceStateChanges.Close()
610         t.updateWantPeersEvent()
611         return
612 }
613
614 func (t *Torrent) requestOffset(r request) int64 {
615         return torrentRequestOffset(t.length, int64(t.usualPieceSize()), r)
616 }
617
618 // Return the request that would include the given offset into the torrent
619 // data. Returns !ok if there is no such request.
620 func (t *Torrent) offsetRequest(off int64) (req request, ok bool) {
621         return torrentOffsetRequest(t.length, t.info.PieceLength, int64(t.chunkSize), off)
622 }
623
624 func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
625         tr := perf.NewTimer()
626
627         n, err := t.pieces[piece].Storage().WriteAt(data, begin)
628         if err == nil && n != len(data) {
629                 err = io.ErrShortWrite
630         }
631         if err == nil {
632                 tr.Mark("write chunk")
633         }
634         return
635 }
636
637 func (t *Torrent) bitfield() (bf []bool) {
638         bf = make([]bool, t.numPieces())
639         t.completedPieces.IterTyped(func(piece int) (again bool) {
640                 bf[piece] = true
641                 return true
642         })
643         return
644 }
645
646 func (t *Torrent) pieceNumChunks(piece int) int {
647         return int((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize)
648 }
649
650 func (t *Torrent) pendAllChunkSpecs(pieceIndex int) {
651         t.pieces[pieceIndex].dirtyChunks.Clear()
652 }
653
654 type Peer struct {
655         Id     [20]byte
656         IP     net.IP
657         Port   int
658         Source peerSource
659         // Peer is known to support encryption.
660         SupportsEncryption bool
661 }
662
663 func (t *Torrent) pieceLength(piece int) pp.Integer {
664         if piece == t.numPieces()-1 {
665                 ret := pp.Integer(t.length % t.info.PieceLength)
666                 if ret != 0 {
667                         return ret
668                 }
669         }
670         return pp.Integer(t.info.PieceLength)
671 }
672
673 func (t *Torrent) hashPiece(piece int) (ret metainfo.Hash) {
674         hash := pieceHash.New()
675         p := &t.pieces[piece]
676         p.waitNoPendingWrites()
677         ip := t.info.Piece(piece)
678         pl := ip.Length()
679         n, err := io.Copy(hash, io.NewSectionReader(t.pieces[piece].Storage(), 0, pl))
680         if n == pl {
681                 missinggo.CopyExact(&ret, hash.Sum(nil))
682                 return
683         }
684         if err != io.ErrUnexpectedEOF && !os.IsNotExist(err) {
685                 log.Printf("unexpected error hashing piece with %T: %s", t.storage.TorrentImpl, err)
686         }
687         return
688 }
689
690 func (t *Torrent) haveAnyPieces() bool {
691         for i := range t.pieces {
692                 if t.pieceComplete(i) {
693                         return true
694                 }
695         }
696         return false
697 }
698
699 func (t *Torrent) havePiece(index int) bool {
700         return t.haveInfo() && t.pieceComplete(index)
701 }
702
703 func (t *Torrent) haveChunk(r request) (ret bool) {
704         // defer func() {
705         //      log.Println("have chunk", r, ret)
706         // }()
707         if !t.haveInfo() {
708                 return false
709         }
710         if t.pieceComplete(int(r.Index)) {
711                 return true
712         }
713         p := &t.pieces[r.Index]
714         return !p.pendingChunk(r.chunkSpec, t.chunkSize)
715 }
716
717 func chunkIndex(cs chunkSpec, chunkSize pp.Integer) int {
718         return int(cs.Begin / chunkSize)
719 }
720
721 func (t *Torrent) wantPiece(r request) bool {
722         if !t.wantPieceIndex(int(r.Index)) {
723                 return false
724         }
725         if t.pieces[r.Index].pendingChunk(r.chunkSpec, t.chunkSize) {
726                 return true
727         }
728         // TODO: What about pieces that were wanted, but aren't now, and aren't
729         // completed either? That used to be done here.
730         return false
731 }
732
733 func (t *Torrent) wantPieceIndex(index int) bool {
734         if !t.haveInfo() {
735                 return false
736         }
737         if index < 0 || index >= t.numPieces() {
738                 return false
739         }
740         p := &t.pieces[index]
741         if p.queuedForHash {
742                 return false
743         }
744         if p.hashing {
745                 return false
746         }
747         if t.pieceComplete(index) {
748                 return false
749         }
750         if t.pendingPieces.Contains(index) {
751                 return true
752         }
753         return !t.forReaderOffsetPieces(func(begin, end int) bool {
754                 return index < begin || index >= end
755         })
756 }
757
758 // The worst connection is one that hasn't been sent, or sent anything useful
759 // for the longest. A bad connection is one that usually sends us unwanted
760 // pieces, or has been in worser half of the established connections for more
761 // than a minute.
762 func (t *Torrent) worstBadConn() *connection {
763         wcs := worseConnSlice{t.unclosedConnsAsSlice()}
764         heap.Init(&wcs)
765         for wcs.Len() != 0 {
766                 c := heap.Pop(&wcs).(*connection)
767                 if c.UnwantedChunksReceived >= 6 && c.UnwantedChunksReceived > c.UsefulChunksReceived {
768                         return c
769                 }
770                 if wcs.Len() >= (t.maxEstablishedConns+1)/2 {
771                         // Give connections 1 minute to prove themselves.
772                         if time.Since(c.completedHandshake) > time.Minute {
773                                 return c
774                         }
775                 }
776         }
777         return nil
778 }
779
780 type PieceStateChange struct {
781         Index int
782         PieceState
783 }
784
785 func (t *Torrent) publishPieceChange(piece int) {
786         cur := t.pieceState(piece)
787         p := &t.pieces[piece]
788         if cur != p.publicPieceState {
789                 p.publicPieceState = cur
790                 t.pieceStateChanges.Publish(PieceStateChange{
791                         piece,
792                         cur,
793                 })
794         }
795 }
796
797 func (t *Torrent) pieceNumPendingChunks(piece int) int {
798         if t.pieceComplete(piece) {
799                 return 0
800         }
801         return t.pieceNumChunks(piece) - t.pieces[piece].numDirtyChunks()
802 }
803
804 func (t *Torrent) pieceAllDirty(piece int) bool {
805         return t.pieces[piece].dirtyChunks.Len() == t.pieceNumChunks(piece)
806 }
807
808 func (t *Torrent) readersChanged() {
809         t.updateReaderPieces()
810         t.updateAllPiecePriorities()
811 }
812
813 func (t *Torrent) updateReaderPieces() {
814         t.readerNowPieces, t.readerReadaheadPieces = t.readerPiecePriorities()
815 }
816
817 func (t *Torrent) readerPosChanged(from, to pieceRange) {
818         if from == to {
819                 return
820         }
821         t.updateReaderPieces()
822         // Order the ranges, high and low.
823         l, h := from, to
824         if l.begin > h.begin {
825                 l, h = h, l
826         }
827         if l.end < h.begin {
828                 // Two distinct ranges.
829                 t.updatePiecePriorities(l.begin, l.end)
830                 t.updatePiecePriorities(h.begin, h.end)
831         } else {
832                 // Ranges overlap.
833                 end := l.end
834                 if h.end > end {
835                         end = h.end
836                 }
837                 t.updatePiecePriorities(l.begin, end)
838         }
839 }
840
841 func (t *Torrent) maybeNewConns() {
842         // Tickle the accept routine.
843         t.cl.event.Broadcast()
844         t.openNewConns()
845 }
846
847 func (t *Torrent) piecePriorityChanged(piece int) {
848         for c := range t.conns {
849                 if c.updatePiecePriority(piece) {
850                         c.updateRequests()
851                 }
852         }
853         t.maybeNewConns()
854         t.publishPieceChange(piece)
855 }
856
857 func (t *Torrent) updatePiecePriority(piece int) {
858         p := &t.pieces[piece]
859         newPrio := t.piecePriorityUncached(piece)
860         if newPrio == p.priority {
861                 return
862         }
863         p.priority = newPrio
864         t.piecePriorityChanged(piece)
865 }
866
867 func (t *Torrent) updateAllPiecePriorities() {
868         t.updatePiecePriorities(0, len(t.pieces))
869 }
870
871 // Update all piece priorities in one hit. This function should have the same
872 // output as updatePiecePriority, but across all pieces.
873 func (t *Torrent) updatePiecePriorities(begin, end int) {
874         for i := begin; i < end; i++ {
875                 t.updatePiecePriority(i)
876         }
877 }
878
879 // Returns the range of pieces [begin, end) that contains the extent of bytes.
880 func (t *Torrent) byteRegionPieces(off, size int64) (begin, end int) {
881         if off >= t.length {
882                 return
883         }
884         if off < 0 {
885                 size += off
886                 off = 0
887         }
888         if size <= 0 {
889                 return
890         }
891         begin = int(off / t.info.PieceLength)
892         end = int((off + size + t.info.PieceLength - 1) / t.info.PieceLength)
893         if end > t.info.NumPieces() {
894                 end = t.info.NumPieces()
895         }
896         return
897 }
898
899 // Returns true if all iterations complete without breaking. Returns the read
900 // regions for all readers. The reader regions should not be merged as some
901 // callers depend on this method to enumerate readers.
902 func (t *Torrent) forReaderOffsetPieces(f func(begin, end int) (more bool)) (all bool) {
903         for r := range t.readers {
904                 p := r.pieces
905                 if p.begin >= p.end {
906                         continue
907                 }
908                 if !f(p.begin, p.end) {
909                         return false
910                 }
911         }
912         return true
913 }
914
915 func (t *Torrent) piecePriority(piece int) piecePriority {
916         if !t.haveInfo() {
917                 return PiecePriorityNone
918         }
919         return t.pieces[piece].priority
920 }
921
922 func (t *Torrent) piecePriorityUncached(piece int) piecePriority {
923         if t.pieceComplete(piece) {
924                 return PiecePriorityNone
925         }
926         if t.readerNowPieces.Contains(piece) {
927                 return PiecePriorityNow
928         }
929         // if t.readerNowPieces.Contains(piece - 1) {
930         //      return PiecePriorityNext
931         // }
932         if t.readerReadaheadPieces.Contains(piece) {
933                 return PiecePriorityReadahead
934         }
935         if t.pendingPieces.Contains(piece) {
936                 return PiecePriorityNormal
937         }
938         return PiecePriorityNone
939 }
940
941 func (t *Torrent) pendPiece(piece int) {
942         if t.pendingPieces.Contains(piece) {
943                 return
944         }
945         if t.havePiece(piece) {
946                 return
947         }
948         t.pendingPieces.Add(piece)
949         t.updatePiecePriority(piece)
950 }
951
952 func (t *Torrent) unpendPieces(unpend bitmap.Bitmap) {
953         t.pendingPieces.Sub(unpend)
954         unpend.IterTyped(func(piece int) (again bool) {
955                 t.updatePiecePriority(piece)
956                 return true
957         })
958 }
959
960 func (t *Torrent) pendPieceRange(begin, end int) {
961         for i := begin; i < end; i++ {
962                 t.pendPiece(i)
963         }
964 }
965
966 func (t *Torrent) unpendPieceRange(begin, end int) {
967         var bm bitmap.Bitmap
968         bm.AddRange(begin, end)
969         t.unpendPieces(bm)
970 }
971
972 func (t *Torrent) pendRequest(req request) {
973         ci := chunkIndex(req.chunkSpec, t.chunkSize)
974         t.pieces[req.Index].pendChunkIndex(ci)
975 }
976
977 func (t *Torrent) pieceCompletionChanged(piece int) {
978         t.cl.event.Broadcast()
979         if t.pieceComplete(piece) {
980                 t.onPieceCompleted(piece)
981         } else {
982                 t.onIncompletePiece(piece)
983         }
984         t.updatePiecePriority(piece)
985 }
986
987 func (t *Torrent) openNewConns() {
988         t.cl.openNewConns(t)
989 }
990
991 func (t *Torrent) getConnPieceInclination() []int {
992         _ret := t.connPieceInclinationPool.Get()
993         if _ret == nil {
994                 pieceInclinationsNew.Add(1)
995                 return rand.Perm(t.numPieces())
996         }
997         pieceInclinationsReused.Add(1)
998         return _ret.([]int)
999 }
1000
1001 func (t *Torrent) putPieceInclination(pi []int) {
1002         t.connPieceInclinationPool.Put(pi)
1003         pieceInclinationsPut.Add(1)
1004 }
1005
1006 func (t *Torrent) updatePieceCompletion(piece int) {
1007         pcu := t.pieceCompleteUncached(piece)
1008         changed := t.completedPieces.Get(piece) != pcu
1009         t.completedPieces.Set(piece, pcu)
1010         if changed {
1011                 t.pieceCompletionChanged(piece)
1012         }
1013 }
1014
1015 // Non-blocking read. Client lock is not required.
1016 func (t *Torrent) readAt(b []byte, off int64) (n int, err error) {
1017         p := &t.pieces[off/t.info.PieceLength]
1018         p.waitNoPendingWrites()
1019         return p.Storage().ReadAt(b, off-p.Info().Offset())
1020 }
1021
1022 func (t *Torrent) updateAllPieceCompletions() {
1023         for i := range iter.N(t.numPieces()) {
1024                 t.updatePieceCompletion(i)
1025         }
1026 }
1027
1028 // Returns an error if the metadata was completed, but couldn't be set for
1029 // some reason. Blame it on the last peer to contribute.
1030 func (t *Torrent) maybeCompleteMetadata() error {
1031         if t.haveInfo() {
1032                 // Nothing to do.
1033                 return nil
1034         }
1035         if !t.haveAllMetadataPieces() {
1036                 // Don't have enough metadata pieces.
1037                 return nil
1038         }
1039         err := t.setInfoBytes(t.metadataBytes)
1040         if err != nil {
1041                 t.invalidateMetadata()
1042                 return fmt.Errorf("error setting info bytes: %s", err)
1043         }
1044         if t.cl.config.Debug {
1045                 log.Printf("%s: got metadata from peers", t)
1046         }
1047         return nil
1048 }
1049
1050 func (t *Torrent) readerPieces() (ret bitmap.Bitmap) {
1051         t.forReaderOffsetPieces(func(begin, end int) bool {
1052                 ret.AddRange(begin, end)
1053                 return true
1054         })
1055         return
1056 }
1057
1058 func (t *Torrent) readerPiecePriorities() (now, readahead bitmap.Bitmap) {
1059         t.forReaderOffsetPieces(func(begin, end int) bool {
1060                 if end > begin {
1061                         now.Add(begin)
1062                         readahead.AddRange(begin+1, end)
1063                 }
1064                 return true
1065         })
1066         return
1067 }
1068
1069 func (t *Torrent) needData() bool {
1070         if t.closed.IsSet() {
1071                 return false
1072         }
1073         if !t.haveInfo() {
1074                 return true
1075         }
1076         if t.pendingPieces.Len() != 0 {
1077                 return true
1078         }
1079         // Read as "not all complete".
1080         return !t.readerPieces().IterTyped(func(piece int) bool {
1081                 return t.pieceComplete(piece)
1082         })
1083 }
1084
1085 func appendMissingStrings(old, new []string) (ret []string) {
1086         ret = old
1087 new:
1088         for _, n := range new {
1089                 for _, o := range old {
1090                         if o == n {
1091                                 continue new
1092                         }
1093                 }
1094                 ret = append(ret, n)
1095         }
1096         return
1097 }
1098
1099 func appendMissingTrackerTiers(existing [][]string, minNumTiers int) (ret [][]string) {
1100         ret = existing
1101         for minNumTiers > len(ret) {
1102                 ret = append(ret, nil)
1103         }
1104         return
1105 }
1106
1107 func (t *Torrent) addTrackers(announceList [][]string) {
1108         fullAnnounceList := &t.metainfo.AnnounceList
1109         t.metainfo.AnnounceList = appendMissingTrackerTiers(*fullAnnounceList, len(announceList))
1110         for tierIndex, trackerURLs := range announceList {
1111                 (*fullAnnounceList)[tierIndex] = appendMissingStrings((*fullAnnounceList)[tierIndex], trackerURLs)
1112         }
1113         t.startMissingTrackerScrapers()
1114         t.updateWantPeersEvent()
1115 }
1116
1117 // Don't call this before the info is available.
1118 func (t *Torrent) bytesCompleted() int64 {
1119         if !t.haveInfo() {
1120                 return 0
1121         }
1122         return t.info.TotalLength() - t.bytesLeft()
1123 }
1124
1125 func (t *Torrent) SetInfoBytes(b []byte) (err error) {
1126         t.cl.mu.Lock()
1127         defer t.cl.mu.Unlock()
1128         return t.setInfoBytes(b)
1129 }
1130
1131 // Returns true if connection is removed from torrent.Conns.
1132 func (t *Torrent) deleteConnection(c *connection) (ret bool) {
1133         _, ret = t.conns[c]
1134         delete(t.conns, c)
1135         return
1136 }
1137
1138 func (t *Torrent) dropConnection(c *connection) {
1139         t.cl.event.Broadcast()
1140         c.Close()
1141         if t.deleteConnection(c) {
1142                 t.openNewConns()
1143         }
1144 }
1145
1146 func (t *Torrent) wantPeers() bool {
1147         if t.closed.IsSet() {
1148                 return false
1149         }
1150         if len(t.peers) > torrentPeersLowWater {
1151                 return false
1152         }
1153         return t.needData() || t.seeding()
1154 }
1155
1156 func (t *Torrent) updateWantPeersEvent() {
1157         if t.wantPeers() {
1158                 t.wantPeersEvent.Set()
1159         } else {
1160                 t.wantPeersEvent.Clear()
1161         }
1162 }
1163
1164 // Returns whether the client should make effort to seed the torrent.
1165 func (t *Torrent) seeding() bool {
1166         cl := t.cl
1167         if t.closed.IsSet() {
1168                 return false
1169         }
1170         if cl.config.NoUpload {
1171                 return false
1172         }
1173         if !cl.config.Seed {
1174                 return false
1175         }
1176         if t.needData() {
1177                 return false
1178         }
1179         return true
1180 }
1181
1182 func (t *Torrent) startScrapingTracker(url string) {
1183         if url == "" {
1184                 return
1185         }
1186         if _, ok := t.trackerAnnouncers[url]; ok {
1187                 return
1188         }
1189         newAnnouncer := &trackerScraper{
1190                 url: url,
1191                 t:   t,
1192         }
1193         if t.trackerAnnouncers == nil {
1194                 t.trackerAnnouncers = make(map[string]*trackerScraper)
1195         }
1196         t.trackerAnnouncers[url] = newAnnouncer
1197         go newAnnouncer.Run()
1198 }
1199
1200 // Adds and starts tracker scrapers for tracker URLs that aren't already
1201 // running.
1202 func (t *Torrent) startMissingTrackerScrapers() {
1203         if t.cl.config.DisableTrackers {
1204                 return
1205         }
1206         t.startScrapingTracker(t.metainfo.Announce)
1207         for _, tier := range t.metainfo.AnnounceList {
1208                 for _, url := range tier {
1209                         t.startScrapingTracker(url)
1210                 }
1211         }
1212 }
1213
1214 // Returns an AnnounceRequest with fields filled out to defaults and current
1215 // values.
1216 func (t *Torrent) announceRequest() tracker.AnnounceRequest {
1217         return tracker.AnnounceRequest{
1218                 Event:    tracker.None,
1219                 NumWant:  -1,
1220                 Port:     uint16(t.cl.incomingPeerPort()),
1221                 PeerId:   t.cl.peerID,
1222                 InfoHash: t.infoHash,
1223                 Left:     t.bytesLeftAnnounce(),
1224         }
1225 }
1226
1227 // Adds peers revealed in an announce until the announce ends, or we have
1228 // enough peers.
1229 func (t *Torrent) consumeDHTAnnounce(pvs <-chan dht.PeersValues) {
1230         cl := t.cl
1231         // Count all the unique addresses we got during this announce.
1232         allAddrs := make(map[string]struct{})
1233         for {
1234                 select {
1235                 case v, ok := <-pvs:
1236                         if !ok {
1237                                 return
1238                         }
1239                         addPeers := make([]Peer, 0, len(v.Peers))
1240                         for _, cp := range v.Peers {
1241                                 if cp.Port == 0 {
1242                                         // Can't do anything with this.
1243                                         continue
1244                                 }
1245                                 addPeers = append(addPeers, Peer{
1246                                         IP:     cp.IP[:],
1247                                         Port:   cp.Port,
1248                                         Source: peerSourceDHTGetPeers,
1249                                 })
1250                                 key := (&net.UDPAddr{
1251                                         IP:   cp.IP[:],
1252                                         Port: cp.Port,
1253                                 }).String()
1254                                 allAddrs[key] = struct{}{}
1255                         }
1256                         cl.mu.Lock()
1257                         t.addPeers(addPeers)
1258                         numPeers := len(t.peers)
1259                         cl.mu.Unlock()
1260                         if numPeers >= torrentPeersHighWater {
1261                                 return
1262                         }
1263                 case <-t.closed.LockedChan(&cl.mu):
1264                         return
1265                 }
1266         }
1267 }
1268
1269 func (t *Torrent) announceDHT(impliedPort bool) (err error) {
1270         cl := t.cl
1271         ps, err := cl.dHT.Announce(t.infoHash, cl.incomingPeerPort(), impliedPort)
1272         if err != nil {
1273                 return
1274         }
1275         t.consumeDHTAnnounce(ps.Peers)
1276         ps.Close()
1277         return
1278 }
1279
1280 func (t *Torrent) dhtAnnouncer() {
1281         cl := t.cl
1282         for {
1283                 select {
1284                 case <-t.wantPeersEvent.LockedChan(&cl.mu):
1285                 case <-t.closed.LockedChan(&cl.mu):
1286                         return
1287                 }
1288                 err := t.announceDHT(true)
1289                 func() {
1290                         cl.mu.Lock()
1291                         defer cl.mu.Unlock()
1292                         if err == nil {
1293                                 t.numDHTAnnounces++
1294                         } else {
1295                                 log.Printf("error announcing %q to DHT: %s", t, err)
1296                         }
1297                 }()
1298                 select {
1299                 case <-t.closed.LockedChan(&cl.mu):
1300                         return
1301                 case <-time.After(5 * time.Minute):
1302                 }
1303         }
1304 }
1305
1306 func (t *Torrent) addPeers(peers []Peer) {
1307         for _, p := range peers {
1308                 if t.cl.badPeerIPPort(p.IP, p.Port) {
1309                         continue
1310                 }
1311                 t.addPeer(p)
1312         }
1313 }
1314
1315 func (t *Torrent) Stats() TorrentStats {
1316         t.cl.mu.Lock()
1317         defer t.cl.mu.Unlock()
1318
1319         t.stats.ActivePeers = len(t.conns)
1320         t.stats.HalfOpenPeers = len(t.halfOpen)
1321         t.stats.PendingPeers = len(t.peers)
1322         t.stats.TotalPeers = t.numTotalPeers()
1323
1324         return t.stats
1325 }
1326
1327 // The total number of peers in the torrent.
1328 func (t *Torrent) numTotalPeers() int {
1329         peers := make(map[string]struct{})
1330         for conn := range t.conns {
1331                 peers[conn.conn.RemoteAddr().String()] = struct{}{}
1332         }
1333         for addr := range t.halfOpen {
1334                 peers[addr] = struct{}{}
1335         }
1336         for _, peer := range t.peers {
1337                 peers[fmt.Sprintf("%s:%d", peer.IP, peer.Port)] = struct{}{}
1338         }
1339         return len(peers)
1340 }
1341
1342 // Returns true if the connection is added.
1343 func (t *Torrent) addConnection(c *connection, outgoing bool) bool {
1344         if t.cl.closed.IsSet() {
1345                 return false
1346         }
1347         if !t.wantConns() {
1348                 return false
1349         }
1350         for c0 := range t.conns {
1351                 if c.PeerID == c0.PeerID {
1352                         // Already connected to a client with that ID.
1353                         duplicateClientConns.Add(1)
1354                         lower := string(t.cl.peerID[:]) < string(c.PeerID[:])
1355                         // Retain the connection from initiated from lower peer ID to
1356                         // higher.
1357                         if outgoing == lower {
1358                                 // Close the other one.
1359                                 c0.Close()
1360                                 // Is it safe to delete from the map while we're iterating
1361                                 // over it?
1362                                 t.deleteConnection(c0)
1363                         } else {
1364                                 // Abandon this one.
1365                                 return false
1366                         }
1367                 }
1368         }
1369         if len(t.conns) >= t.maxEstablishedConns {
1370                 c := t.worstBadConn()
1371                 if c == nil {
1372                         return false
1373                 }
1374                 if t.cl.config.Debug && missinggo.CryHeard() {
1375                         log.Printf("%s: dropping connection to make room for new one:\n    %s", t, c)
1376                 }
1377                 c.Close()
1378                 t.deleteConnection(c)
1379         }
1380         if len(t.conns) >= t.maxEstablishedConns {
1381                 panic(len(t.conns))
1382         }
1383         if c.t != nil {
1384                 panic("connection already associated with a torrent")
1385         }
1386         // Reconcile bytes transferred before connection was associated with a
1387         // torrent.
1388         t.stats.wroteBytes(c.stats.BytesWritten)
1389         t.stats.readBytes(c.stats.BytesRead)
1390         c.t = t
1391         t.conns[c] = struct{}{}
1392         return true
1393 }
1394
1395 func (t *Torrent) wantConns() bool {
1396         if !t.networkingEnabled {
1397                 return false
1398         }
1399         if t.closed.IsSet() {
1400                 return false
1401         }
1402         if !t.seeding() && !t.needData() {
1403                 return false
1404         }
1405         if len(t.conns) < t.maxEstablishedConns {
1406                 return true
1407         }
1408         return t.worstBadConn() != nil
1409 }
1410
1411 func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
1412         t.cl.mu.Lock()
1413         defer t.cl.mu.Unlock()
1414         oldMax = t.maxEstablishedConns
1415         t.maxEstablishedConns = max
1416         wcs := slices.HeapInterface(slices.FromMapKeys(t.conns), worseConn)
1417         for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 {
1418                 t.dropConnection(wcs.Pop().(*connection))
1419         }
1420         t.openNewConns()
1421         return oldMax
1422 }
1423
1424 func (t *Torrent) mu() missinggo.RWLocker {
1425         return &t.cl.mu
1426 }
1427
1428 func (t *Torrent) pieceHashed(piece int, correct bool) {
1429         if t.closed.IsSet() {
1430                 return
1431         }
1432         p := &t.pieces[piece]
1433         touchers := t.reapPieceTouchers(piece)
1434         if p.everHashed {
1435                 // Don't score the first time a piece is hashed, it could be an
1436                 // initial check.
1437                 if correct {
1438                         pieceHashedCorrect.Add(1)
1439                 } else {
1440                         log.Printf("%s: piece %d (%s) failed hash: %d connections contributed", t, piece, p.hash, len(touchers))
1441                         pieceHashedNotCorrect.Add(1)
1442                 }
1443         }
1444         p.everHashed = true
1445         if correct {
1446                 for _, c := range touchers {
1447                         c.goodPiecesDirtied++
1448                 }
1449                 err := p.Storage().MarkComplete()
1450                 if err != nil {
1451                         log.Printf("%T: error completing piece %d: %s", t.storage, piece, err)
1452                 }
1453                 t.updatePieceCompletion(piece)
1454         } else {
1455                 if len(touchers) != 0 {
1456                         for _, c := range touchers {
1457                                 // Y u do dis peer?!
1458                                 c.badPiecesDirtied++
1459                         }
1460                         slices.Sort(touchers, connLessTrusted)
1461                         log.Printf("dropping first corresponding conn from trust: %v", func() (ret []int) {
1462                                 for _, c := range touchers {
1463                                         ret = append(ret, c.netGoodPiecesDirtied())
1464                                 }
1465                                 return
1466                         }())
1467                         c := touchers[0]
1468                         t.cl.banPeerIP(missinggo.AddrIP(c.remoteAddr()))
1469                         c.Drop()
1470                 }
1471                 t.onIncompletePiece(piece)
1472         }
1473 }
1474
1475 func (t *Torrent) cancelRequestsForPiece(piece int) {
1476         for cn := range t.conns {
1477                 cn.tickleWriter()
1478         }
1479 }
1480
1481 func (t *Torrent) onPieceCompleted(piece int) {
1482         t.pendingPieces.Remove(piece)
1483         t.pendAllChunkSpecs(piece)
1484         t.cancelRequestsForPiece(piece)
1485         for conn := range t.conns {
1486                 conn.Have(piece)
1487         }
1488 }
1489
1490 func (t *Torrent) onIncompletePiece(piece int) {
1491         if t.pieceAllDirty(piece) {
1492                 t.pendAllChunkSpecs(piece)
1493         }
1494         if !t.wantPieceIndex(piece) {
1495                 return
1496         }
1497         // We could drop any connections that we told we have a piece that we
1498         // don't here. But there's a test failure, and it seems clients don't care
1499         // if you request pieces that you already claim to have. Pruning bad
1500         // connections might just remove any connections that aren't treating us
1501         // favourably anyway.
1502
1503         // for c := range t.conns {
1504         //      if c.sentHave(piece) {
1505         //              c.Drop()
1506         //      }
1507         // }
1508         for conn := range t.conns {
1509                 if conn.PeerHasPiece(piece) {
1510                         conn.updateRequests()
1511                 }
1512         }
1513 }
1514
1515 func (t *Torrent) verifyPiece(piece int) {
1516         cl := t.cl
1517         cl.mu.Lock()
1518         defer cl.mu.Unlock()
1519         p := &t.pieces[piece]
1520         for p.hashing || t.storage == nil {
1521                 cl.event.Wait()
1522         }
1523         p.queuedForHash = false
1524         if t.closed.IsSet() || t.pieceComplete(piece) {
1525                 t.updatePiecePriority(piece)
1526                 return
1527         }
1528         p.hashing = true
1529         t.publishPieceChange(piece)
1530         cl.mu.Unlock()
1531         sum := t.hashPiece(piece)
1532         cl.mu.Lock()
1533         p.numVerifies++
1534         p.hashing = false
1535         t.pieceHashed(piece, sum == p.hash)
1536 }
1537
1538 // Return the connections that touched a piece, and clear the entry while
1539 // doing it.
1540 func (t *Torrent) reapPieceTouchers(piece int) (ret []*connection) {
1541         for c := range t.conns {
1542                 if _, ok := c.peerTouchedPieces[piece]; ok {
1543                         ret = append(ret, c)
1544                         delete(c.peerTouchedPieces, piece)
1545                 }
1546         }
1547         return
1548 }
1549
1550 func (t *Torrent) connsAsSlice() (ret []*connection) {
1551         for c := range t.conns {
1552                 ret = append(ret, c)
1553         }
1554         return
1555 }
1556
1557 // Currently doesn't really queue, but should in the future.
1558 func (t *Torrent) queuePieceCheck(pieceIndex int) {
1559         piece := &t.pieces[pieceIndex]
1560         if piece.queuedForHash {
1561                 return
1562         }
1563         piece.queuedForHash = true
1564         t.publishPieceChange(pieceIndex)
1565         go t.verifyPiece(pieceIndex)
1566 }
1567
1568 func (t *Torrent) VerifyData() {
1569         for i := range iter.N(t.NumPieces()) {
1570                 t.Piece(i).VerifyData()
1571         }
1572 }