]> Sergey Matveev's repositories - btrtrc.git/blob - torrent.go
Kick only the worst connection when a piece fails a check
[btrtrc.git] / torrent.go
1 package torrent
2
3 import (
4         "container/heap"
5         "crypto/sha1"
6         "errors"
7         "fmt"
8         "io"
9         "log"
10         "math"
11         "math/rand"
12         "net"
13         "os"
14         "sync"
15         "text/tabwriter"
16         "time"
17
18         "github.com/anacrolix/missinggo"
19         "github.com/anacrolix/missinggo/bitmap"
20         "github.com/anacrolix/missinggo/perf"
21         "github.com/anacrolix/missinggo/pubsub"
22         "github.com/anacrolix/missinggo/slices"
23         "github.com/bradfitz/iter"
24
25         "github.com/anacrolix/torrent/bencode"
26         "github.com/anacrolix/torrent/dht"
27         "github.com/anacrolix/torrent/metainfo"
28         pp "github.com/anacrolix/torrent/peer_protocol"
29         "github.com/anacrolix/torrent/storage"
30         "github.com/anacrolix/torrent/tracker"
31 )
32
33 func (t *Torrent) chunkIndexSpec(chunkIndex, piece int) chunkSpec {
34         return chunkIndexSpec(chunkIndex, t.pieceLength(piece), t.chunkSize)
35 }
36
37 type peersKey struct {
38         IPBytes string
39         Port    int
40 }
41
42 // Maintains state of torrent within a Client.
43 type Torrent struct {
44         cl *Client
45
46         closed   missinggo.Event
47         infoHash metainfo.Hash
48         pieces   []piece
49         // Values are the piece indices that changed.
50         pieceStateChanges *pubsub.PubSub
51         // The size of chunks to request from peers over the wire. This is
52         // normally 16KiB by convention these days.
53         chunkSize pp.Integer
54         chunkPool *sync.Pool
55         // Total length of the torrent in bytes. Stored because it's not O(1) to
56         // get this from the info dict.
57         length int64
58
59         // The storage to open when the info dict becomes available.
60         storageOpener *storage.Client
61         // Storage for torrent data.
62         storage *storage.Torrent
63
64         metainfo metainfo.MetaInfo
65
66         // The info dict. nil if we don't have it (yet).
67         info *metainfo.Info
68         // Active peer connections, running message stream loops.
69         conns               map[*connection]struct{}
70         maxEstablishedConns int
71         // Set of addrs to which we're attempting to connect. Connections are
72         // half-open until all handshakes are completed.
73         halfOpen map[string]struct{}
74
75         // Reserve of peers to connect to. A peer can be both here and in the
76         // active connections if were told about the peer after connecting with
77         // them. That encourages us to reconnect to peers that are well known in
78         // the swarm.
79         peers          map[peersKey]Peer
80         wantPeersEvent missinggo.Event
81         // An announcer for each tracker URL.
82         trackerAnnouncers map[string]*trackerScraper
83         // How many times we've initiated a DHT announce. TODO: Move into stats.
84         numDHTAnnounces int
85
86         // Name used if the info name isn't available. Should be cleared when the
87         // Info does become available.
88         displayName string
89         // The bencoded bytes of the info dict. This is actively manipulated if
90         // the info bytes aren't initially available, and we try to fetch them
91         // from peers.
92         metadataBytes []byte
93         // Each element corresponds to the 16KiB metadata pieces. If true, we have
94         // received that piece.
95         metadataCompletedChunks []bool
96
97         // Set when .Info is obtained.
98         gotMetainfo missinggo.Event
99
100         readers               map[*Reader]struct{}
101         readerNowPieces       bitmap.Bitmap
102         readerReadaheadPieces bitmap.Bitmap
103
104         // The indexes of pieces we want with normal priority, that aren't
105         // currently available.
106         pendingPieces bitmap.Bitmap
107         // A cache of completed piece indices.
108         completedPieces bitmap.Bitmap
109
110         // A pool of piece priorities []int for assignment to new connections.
111         // These "inclinations" are used to give connections preference for
112         // different pieces.
113         connPieceInclinationPool sync.Pool
114         // Torrent-level statistics.
115         stats TorrentStats
116 }
117
118 func (t *Torrent) setChunkSize(size pp.Integer) {
119         t.chunkSize = size
120         t.chunkPool = &sync.Pool{
121                 New: func() interface{} {
122                         return make([]byte, size)
123                 },
124         }
125 }
126
127 func (t *Torrent) setDisplayName(dn string) {
128         if t.haveInfo() {
129                 return
130         }
131         t.displayName = dn
132 }
133
134 func (t *Torrent) pieceComplete(piece int) bool {
135         return t.completedPieces.Get(piece)
136 }
137
138 func (t *Torrent) pieceCompleteUncached(piece int) bool {
139         return t.pieces[piece].Storage().GetIsComplete()
140 }
141
142 func (t *Torrent) numConnsUnchoked() (num int) {
143         for c := range t.conns {
144                 if !c.PeerChoked {
145                         num++
146                 }
147         }
148         return
149 }
150
151 // There's a connection to that address already.
152 func (t *Torrent) addrActive(addr string) bool {
153         if _, ok := t.halfOpen[addr]; ok {
154                 return true
155         }
156         for c := range t.conns {
157                 if c.remoteAddr().String() == addr {
158                         return true
159                 }
160         }
161         return false
162 }
163
164 func (t *Torrent) worstUnclosedConns() (ret []*connection) {
165         ret = make([]*connection, 0, len(t.conns))
166         for c := range t.conns {
167                 if !c.closed.IsSet() {
168                         ret = append(ret, c)
169                 }
170         }
171         return
172 }
173
174 func (t *Torrent) addPeer(p Peer) {
175         cl := t.cl
176         cl.openNewConns(t)
177         if len(t.peers) >= torrentPeersHighWater {
178                 return
179         }
180         key := peersKey{string(p.IP), p.Port}
181         if _, ok := t.peers[key]; ok {
182                 return
183         }
184         t.peers[key] = p
185         peersAddedBySource.Add(string(p.Source), 1)
186         cl.openNewConns(t)
187
188 }
189
190 func (t *Torrent) invalidateMetadata() {
191         for i := range t.metadataCompletedChunks {
192                 t.metadataCompletedChunks[i] = false
193         }
194         t.info = nil
195 }
196
197 func (t *Torrent) saveMetadataPiece(index int, data []byte) {
198         if t.haveInfo() {
199                 return
200         }
201         if index >= len(t.metadataCompletedChunks) {
202                 log.Printf("%s: ignoring metadata piece %d", t, index)
203                 return
204         }
205         copy(t.metadataBytes[(1<<14)*index:], data)
206         t.metadataCompletedChunks[index] = true
207 }
208
209 func (t *Torrent) metadataPieceCount() int {
210         return (len(t.metadataBytes) + (1 << 14) - 1) / (1 << 14)
211 }
212
213 func (t *Torrent) haveMetadataPiece(piece int) bool {
214         if t.haveInfo() {
215                 return (1<<14)*piece < len(t.metadataBytes)
216         } else {
217                 return piece < len(t.metadataCompletedChunks) && t.metadataCompletedChunks[piece]
218         }
219 }
220
221 func (t *Torrent) metadataSizeKnown() bool {
222         return t.metadataBytes != nil
223 }
224
225 func (t *Torrent) metadataSize() int {
226         return len(t.metadataBytes)
227 }
228
229 func infoPieceHashes(info *metainfo.Info) (ret []string) {
230         for i := 0; i < len(info.Pieces); i += sha1.Size {
231                 ret = append(ret, string(info.Pieces[i:i+sha1.Size]))
232         }
233         return
234 }
235
236 func (t *Torrent) makePieces() {
237         hashes := infoPieceHashes(t.info)
238         t.pieces = make([]piece, len(hashes))
239         for i, hash := range hashes {
240                 piece := &t.pieces[i]
241                 piece.t = t
242                 piece.index = i
243                 piece.noPendingWrites.L = &piece.pendingWritesMutex
244                 missinggo.CopyExact(piece.Hash[:], hash)
245         }
246 }
247
248 // Called when metadata for a torrent becomes available.
249 func (t *Torrent) setInfoBytes(b []byte) error {
250         if t.haveInfo() {
251                 return nil
252         }
253         if metainfo.HashBytes(b) != t.infoHash {
254                 return errors.New("info bytes have wrong hash")
255         }
256         var info metainfo.Info
257         err := bencode.Unmarshal(b, &info)
258         if err != nil {
259                 return fmt.Errorf("error unmarshalling info bytes: %s", err)
260         }
261         err = validateInfo(&info)
262         if err != nil {
263                 return fmt.Errorf("bad info: %s", err)
264         }
265         defer t.updateWantPeersEvent()
266         t.info = &info
267         t.displayName = "" // Save a few bytes lol.
268         t.cl.event.Broadcast()
269         t.gotMetainfo.Set()
270         t.storage, err = t.storageOpener.OpenTorrent(t.info, t.infoHash)
271         if err != nil {
272                 return fmt.Errorf("error opening torrent storage: %s", err)
273         }
274         t.length = 0
275         for _, f := range t.info.UpvertedFiles() {
276                 t.length += f.Length
277         }
278         t.metadataBytes = b
279         t.metadataCompletedChunks = nil
280         t.makePieces()
281         for conn := range t.conns {
282                 if err := conn.setNumPieces(t.numPieces()); err != nil {
283                         log.Printf("closing connection: %s", err)
284                         conn.Close()
285                 }
286         }
287         for i := range t.pieces {
288                 t.updatePieceCompletion(i)
289                 t.pieces[i].QueuedForHash = true
290         }
291         go func() {
292                 for i := range t.pieces {
293                         t.verifyPiece(i)
294                 }
295         }()
296         return nil
297 }
298
299 func (t *Torrent) haveAllMetadataPieces() bool {
300         if t.haveInfo() {
301                 return true
302         }
303         if t.metadataCompletedChunks == nil {
304                 return false
305         }
306         for _, have := range t.metadataCompletedChunks {
307                 if !have {
308                         return false
309                 }
310         }
311         return true
312 }
313
314 // TODO: Propagate errors to disconnect peer.
315 func (t *Torrent) setMetadataSize(bytes int64) (err error) {
316         if t.haveInfo() {
317                 // We already know the correct metadata size.
318                 return
319         }
320         if bytes <= 0 || bytes > 10000000 { // 10MB, pulled from my ass.
321                 return errors.New("bad size")
322         }
323         if t.metadataBytes != nil && len(t.metadataBytes) == int(bytes) {
324                 return
325         }
326         t.metadataBytes = make([]byte, bytes)
327         t.metadataCompletedChunks = make([]bool, (bytes+(1<<14)-1)/(1<<14))
328         for c := range t.conns {
329                 c.requestPendingMetadata()
330         }
331         return
332 }
333
334 // The current working name for the torrent. Either the name in the info dict,
335 // or a display name given such as by the dn value in a magnet link, or "".
336 func (t *Torrent) name() string {
337         if t.haveInfo() {
338                 return t.info.Name
339         }
340         return t.displayName
341 }
342
343 func (t *Torrent) pieceState(index int) (ret PieceState) {
344         p := &t.pieces[index]
345         ret.Priority = t.piecePriority(index)
346         if t.pieceComplete(index) {
347                 ret.Complete = true
348         }
349         if p.QueuedForHash || p.Hashing {
350                 ret.Checking = true
351         }
352         if !ret.Complete && t.piecePartiallyDownloaded(index) {
353                 ret.Partial = true
354         }
355         return
356 }
357
358 func (t *Torrent) metadataPieceSize(piece int) int {
359         return metadataPieceSize(len(t.metadataBytes), piece)
360 }
361
362 func (t *Torrent) newMetadataExtensionMessage(c *connection, msgType int, piece int, data []byte) pp.Message {
363         d := map[string]int{
364                 "msg_type": msgType,
365                 "piece":    piece,
366         }
367         if data != nil {
368                 d["total_size"] = len(t.metadataBytes)
369         }
370         p, err := bencode.Marshal(d)
371         if err != nil {
372                 panic(err)
373         }
374         return pp.Message{
375                 Type:            pp.Extended,
376                 ExtendedID:      c.PeerExtensionIDs["ut_metadata"],
377                 ExtendedPayload: append(p, data...),
378         }
379 }
380
381 func (t *Torrent) pieceStateRuns() (ret []PieceStateRun) {
382         rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
383                 ret = append(ret, PieceStateRun{
384                         PieceState: el.(PieceState),
385                         Length:     int(count),
386                 })
387         })
388         for index := range t.pieces {
389                 rle.Append(t.pieceState(index), 1)
390         }
391         rle.Flush()
392         return
393 }
394
395 // Produces a small string representing a PieceStateRun.
396 func pieceStateRunStatusChars(psr PieceStateRun) (ret string) {
397         ret = fmt.Sprintf("%d", psr.Length)
398         ret += func() string {
399                 switch psr.Priority {
400                 case PiecePriorityNext:
401                         return "N"
402                 case PiecePriorityNormal:
403                         return "."
404                 case PiecePriorityReadahead:
405                         return "R"
406                 case PiecePriorityNow:
407                         return "!"
408                 default:
409                         return ""
410                 }
411         }()
412         if psr.Checking {
413                 ret += "H"
414         }
415         if psr.Partial {
416                 ret += "P"
417         }
418         if psr.Complete {
419                 ret += "C"
420         }
421         return
422 }
423
424 func (t *Torrent) writeStatus(w io.Writer) {
425         fmt.Fprintf(w, "Infohash: %x\n", t.infoHash)
426         fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
427         if !t.haveInfo() {
428                 fmt.Fprintf(w, "Metadata have: ")
429                 for _, h := range t.metadataCompletedChunks {
430                         fmt.Fprintf(w, "%c", func() rune {
431                                 if h {
432                                         return 'H'
433                                 } else {
434                                         return '.'
435                                 }
436                         }())
437                 }
438                 fmt.Fprintln(w)
439         }
440         fmt.Fprintf(w, "Piece length: %s\n", func() string {
441                 if t.haveInfo() {
442                         return fmt.Sprint(t.usualPieceSize())
443                 } else {
444                         return "?"
445                 }
446         }())
447         if t.Info() != nil {
448                 fmt.Fprintf(w, "Num Pieces: %d\n", t.numPieces())
449                 fmt.Fprint(w, "Piece States:")
450                 for _, psr := range t.PieceStateRuns() {
451                         w.Write([]byte(" "))
452                         w.Write([]byte(pieceStateRunStatusChars(psr)))
453                 }
454                 fmt.Fprintln(w)
455         }
456         fmt.Fprintf(w, "Reader Pieces:")
457         t.forReaderOffsetPieces(func(begin, end int) (again bool) {
458                 fmt.Fprintf(w, " %d:%d", begin, end)
459                 return true
460         })
461         fmt.Fprintln(w)
462
463         fmt.Fprintf(w, "Trackers:\n")
464         func() {
465                 tw := tabwriter.NewWriter(w, 0, 0, 2, ' ', 0)
466                 fmt.Fprintf(tw, "    URL\tNext announce\tLast announce\n")
467                 for _, ta := range slices.Sort(slices.FromMapElems(t.trackerAnnouncers), func(l, r *trackerScraper) bool {
468                         return l.url < r.url
469                 }).([]*trackerScraper) {
470                         fmt.Fprintf(tw, "    %s\n", ta.statusLine())
471                 }
472                 tw.Flush()
473         }()
474
475         fmt.Fprintf(w, "DHT Announces: %d\n", t.numDHTAnnounces)
476
477         fmt.Fprintf(w, "Pending peers: %d\n", len(t.peers))
478         fmt.Fprintf(w, "Half open: %d\n", len(t.halfOpen))
479         fmt.Fprintf(w, "Active peers: %d\n", len(t.conns))
480         conns := t.connsAsSlice()
481         slices.Sort(conns, worseConn)
482         for i, c := range conns {
483                 fmt.Fprintf(w, "%2d. ", i+1)
484                 c.WriteStatus(w, t)
485         }
486 }
487
488 func (t *Torrent) haveInfo() bool {
489         return t.info != nil
490 }
491
492 // TODO: Include URIs that weren't converted to tracker clients.
493 func (t *Torrent) announceList() (al [][]string) {
494         return t.metainfo.AnnounceList
495 }
496
497 // Returns a run-time generated MetaInfo that includes the info bytes and
498 // announce-list as currently known to the client.
499 func (t *Torrent) newMetaInfo() metainfo.MetaInfo {
500         return metainfo.MetaInfo{
501                 CreationDate: time.Now().Unix(),
502                 Comment:      "dynamic metainfo from client",
503                 CreatedBy:    "go.torrent",
504                 AnnounceList: t.announceList(),
505                 InfoBytes:    t.metadataBytes,
506         }
507 }
508
509 func (t *Torrent) BytesMissing() int64 {
510         t.mu().RLock()
511         defer t.mu().RUnlock()
512         return t.bytesLeft()
513 }
514
515 func (t *Torrent) bytesLeft() (left int64) {
516         for i := 0; i < t.numPieces(); i++ {
517                 left += int64(t.pieces[i].bytesLeft())
518         }
519         return
520 }
521
522 // Bytes left to give in tracker announces.
523 func (t *Torrent) bytesLeftAnnounce() uint64 {
524         if t.haveInfo() {
525                 return uint64(t.bytesLeft())
526         } else {
527                 return math.MaxUint64
528         }
529 }
530
531 func (t *Torrent) piecePartiallyDownloaded(piece int) bool {
532         if t.pieceComplete(piece) {
533                 return false
534         }
535         if t.pieceAllDirty(piece) {
536                 return false
537         }
538         return t.pieces[piece].hasDirtyChunks()
539 }
540
541 func (t *Torrent) usualPieceSize() int {
542         return int(t.info.PieceLength)
543 }
544
545 func (t *Torrent) lastPieceSize() int {
546         return int(t.pieceLength(t.numPieces() - 1))
547 }
548
549 func (t *Torrent) numPieces() int {
550         return t.info.NumPieces()
551 }
552
553 func (t *Torrent) numPiecesCompleted() (num int) {
554         return t.completedPieces.Len()
555 }
556
557 func (t *Torrent) close() (err error) {
558         t.closed.Set()
559         if t.storage != nil {
560                 t.storage.Close()
561         }
562         for conn := range t.conns {
563                 conn.Close()
564         }
565         t.pieceStateChanges.Close()
566         t.updateWantPeersEvent()
567         return
568 }
569
570 func (t *Torrent) requestOffset(r request) int64 {
571         return torrentRequestOffset(t.length, int64(t.usualPieceSize()), r)
572 }
573
574 // Return the request that would include the given offset into the torrent
575 // data. Returns !ok if there is no such request.
576 func (t *Torrent) offsetRequest(off int64) (req request, ok bool) {
577         return torrentOffsetRequest(t.length, t.info.PieceLength, int64(t.chunkSize), off)
578 }
579
580 func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
581         tr := perf.NewTimer()
582
583         n, err := t.pieces[piece].Storage().WriteAt(data, begin)
584         if err == nil && n != len(data) {
585                 err = io.ErrShortWrite
586         }
587         if err == nil {
588                 tr.Stop("write chunk")
589         }
590         return
591 }
592
593 func (t *Torrent) bitfield() (bf []bool) {
594         bf = make([]bool, t.numPieces())
595         t.completedPieces.IterTyped(func(piece int) (again bool) {
596                 bf[piece] = true
597                 return true
598         })
599         return
600 }
601
602 func (t *Torrent) validOutgoingRequest(r request) bool {
603         if r.Index >= pp.Integer(t.info.NumPieces()) {
604                 return false
605         }
606         if r.Begin%t.chunkSize != 0 {
607                 return false
608         }
609         if r.Length > t.chunkSize {
610                 return false
611         }
612         pieceLength := t.pieceLength(int(r.Index))
613         if r.Begin+r.Length > pieceLength {
614                 return false
615         }
616         return r.Length == t.chunkSize || r.Begin+r.Length == pieceLength
617 }
618
619 func (t *Torrent) pieceChunks(piece int) (css []chunkSpec) {
620         css = make([]chunkSpec, 0, (t.pieceLength(piece)+t.chunkSize-1)/t.chunkSize)
621         var cs chunkSpec
622         for left := t.pieceLength(piece); left != 0; left -= cs.Length {
623                 cs.Length = left
624                 if cs.Length > t.chunkSize {
625                         cs.Length = t.chunkSize
626                 }
627                 css = append(css, cs)
628                 cs.Begin += cs.Length
629         }
630         return
631 }
632
633 func (t *Torrent) pieceNumChunks(piece int) int {
634         return int((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize)
635 }
636
637 func (t *Torrent) pendAllChunkSpecs(pieceIndex int) {
638         t.pieces[pieceIndex].DirtyChunks.Clear()
639 }
640
641 type Peer struct {
642         Id     [20]byte
643         IP     net.IP
644         Port   int
645         Source peerSource
646         // Peer is known to support encryption.
647         SupportsEncryption bool
648 }
649
650 func (t *Torrent) pieceLength(piece int) (len_ pp.Integer) {
651         if piece < 0 || piece >= t.info.NumPieces() {
652                 return
653         }
654         if piece == t.numPieces()-1 {
655                 len_ = pp.Integer(t.length % t.info.PieceLength)
656         }
657         if len_ == 0 {
658                 len_ = pp.Integer(t.info.PieceLength)
659         }
660         return
661 }
662
663 func (t *Torrent) hashPiece(piece int) (ret metainfo.Hash) {
664         hash := pieceHash.New()
665         p := &t.pieces[piece]
666         p.waitNoPendingWrites()
667         ip := t.info.Piece(piece)
668         pl := ip.Length()
669         n, err := io.Copy(hash, io.NewSectionReader(t.pieces[piece].Storage(), 0, pl))
670         if n == pl {
671                 missinggo.CopyExact(&ret, hash.Sum(nil))
672                 return
673         }
674         if err != io.ErrUnexpectedEOF && !os.IsNotExist(err) {
675                 log.Printf("unexpected error hashing piece with %T: %s", t.storage, err)
676         }
677         return
678 }
679
680 func (t *Torrent) haveAllPieces() bool {
681         if !t.haveInfo() {
682                 return false
683         }
684         return t.completedPieces.Len() == t.numPieces()
685 }
686
687 func (t *Torrent) haveAnyPieces() bool {
688         for i := range t.pieces {
689                 if t.pieceComplete(i) {
690                         return true
691                 }
692         }
693         return false
694 }
695
696 func (t *Torrent) havePiece(index int) bool {
697         return t.haveInfo() && t.pieceComplete(index)
698 }
699
700 func (t *Torrent) haveChunk(r request) (ret bool) {
701         // defer func() {
702         //      log.Println("have chunk", r, ret)
703         // }()
704         if !t.haveInfo() {
705                 return false
706         }
707         if t.pieceComplete(int(r.Index)) {
708                 return true
709         }
710         p := &t.pieces[r.Index]
711         return !p.pendingChunk(r.chunkSpec, t.chunkSize)
712 }
713
714 func chunkIndex(cs chunkSpec, chunkSize pp.Integer) int {
715         return int(cs.Begin / chunkSize)
716 }
717
718 func (t *Torrent) wantPiece(r request) bool {
719         if !t.wantPieceIndex(int(r.Index)) {
720                 return false
721         }
722         if t.pieces[r.Index].pendingChunk(r.chunkSpec, t.chunkSize) {
723                 return true
724         }
725         // TODO: What about pieces that were wanted, but aren't now, and aren't
726         // completed either? That used to be done here.
727         return false
728 }
729
730 func (t *Torrent) wantPieceIndex(index int) bool {
731         if !t.haveInfo() {
732                 return false
733         }
734         if index < 0 || index >= t.numPieces() {
735                 return false
736         }
737         p := &t.pieces[index]
738         if p.QueuedForHash {
739                 return false
740         }
741         if p.Hashing {
742                 return false
743         }
744         if t.pieceComplete(index) {
745                 return false
746         }
747         if t.pendingPieces.Contains(index) {
748                 return true
749         }
750         return !t.forReaderOffsetPieces(func(begin, end int) bool {
751                 return index < begin || index >= end
752         })
753 }
754
755 func (t *Torrent) forNeededPieces(f func(piece int) (more bool)) (all bool) {
756         return t.forReaderOffsetPieces(func(begin, end int) (more bool) {
757                 for i := begin; begin < end; i++ {
758                         if !f(i) {
759                                 return false
760                         }
761                 }
762                 return true
763         })
764 }
765
766 func (t *Torrent) connHasWantedPieces(c *connection) bool {
767         return !c.pieceRequestOrder.IsEmpty()
768 }
769
770 func (t *Torrent) extentPieces(off, _len int64) (pieces []int) {
771         for i := off / int64(t.usualPieceSize()); i*int64(t.usualPieceSize()) < off+_len; i++ {
772                 pieces = append(pieces, int(i))
773         }
774         return
775 }
776
777 // The worst connection is one that hasn't been sent, or sent anything useful
778 // for the longest. A bad connection is one that usually sends us unwanted
779 // pieces, or has been in worser half of the established connections for more
780 // than a minute.
781 func (t *Torrent) worstBadConn() *connection {
782         wcs := worseConnSlice{t.worstUnclosedConns()}
783         for wcs.Len() != 0 {
784                 c := heap.Pop(&wcs).(*connection)
785                 if c.UnwantedChunksReceived >= 6 && c.UnwantedChunksReceived > c.UsefulChunksReceived {
786                         return c
787                 }
788                 if wcs.Len() >= (t.maxEstablishedConns+1)/2 {
789                         // Give connections 1 minute to prove themselves.
790                         if time.Since(c.completedHandshake) > time.Minute {
791                                 return c
792                         }
793                 }
794         }
795         return nil
796 }
797
798 type PieceStateChange struct {
799         Index int
800         PieceState
801 }
802
803 func (t *Torrent) publishPieceChange(piece int) {
804         cur := t.pieceState(piece)
805         p := &t.pieces[piece]
806         if cur != p.PublicPieceState {
807                 p.PublicPieceState = cur
808                 t.pieceStateChanges.Publish(PieceStateChange{
809                         piece,
810                         cur,
811                 })
812         }
813 }
814
815 func (t *Torrent) pieceNumPendingChunks(piece int) int {
816         if t.pieceComplete(piece) {
817                 return 0
818         }
819         return t.pieceNumChunks(piece) - t.pieces[piece].numDirtyChunks()
820 }
821
822 func (t *Torrent) pieceAllDirty(piece int) bool {
823         return t.pieces[piece].DirtyChunks.Len() == t.pieceNumChunks(piece)
824 }
825
826 func (t *Torrent) forUrgentPieces(f func(piece int) (again bool)) (all bool) {
827         return t.forReaderOffsetPieces(func(begin, end int) (again bool) {
828                 if begin < end {
829                         if !f(begin) {
830                                 return false
831                         }
832                 }
833                 return true
834         })
835 }
836
837 func (t *Torrent) readersChanged() {
838         t.updateReaderPieces()
839         t.updateAllPiecePriorities()
840 }
841
842 func (t *Torrent) updateReaderPieces() {
843         t.readerNowPieces, t.readerReadaheadPieces = t.readerPiecePriorities()
844 }
845
846 func (t *Torrent) readerPosChanged(from, to pieceRange) {
847         if from == to {
848                 return
849         }
850         t.updateReaderPieces()
851         // Order the ranges, high and low.
852         l, h := from, to
853         if l.begin > h.begin {
854                 l, h = h, l
855         }
856         if l.end < h.begin {
857                 // Two distinct ranges.
858                 t.updatePiecePriorities(l.begin, l.end)
859                 t.updatePiecePriorities(h.begin, h.end)
860         } else {
861                 // Ranges overlap.
862                 end := l.end
863                 if h.end > end {
864                         end = h.end
865                 }
866                 t.updatePiecePriorities(l.begin, end)
867         }
868 }
869
870 func (t *Torrent) maybeNewConns() {
871         // Tickle the accept routine.
872         t.cl.event.Broadcast()
873         t.openNewConns()
874 }
875
876 func (t *Torrent) piecePriorityChanged(piece int) {
877         for c := range t.conns {
878                 c.updatePiecePriority(piece)
879         }
880         t.maybeNewConns()
881         t.publishPieceChange(piece)
882 }
883
884 func (t *Torrent) updatePiecePriority(piece int) {
885         p := &t.pieces[piece]
886         newPrio := t.piecePriorityUncached(piece)
887         if newPrio == p.priority {
888                 return
889         }
890         p.priority = newPrio
891         t.piecePriorityChanged(piece)
892 }
893
894 func (t *Torrent) updateAllPiecePriorities() {
895         t.updatePiecePriorities(0, len(t.pieces))
896 }
897
898 // Update all piece priorities in one hit. This function should have the same
899 // output as updatePiecePriority, but across all pieces.
900 func (t *Torrent) updatePiecePriorities(begin, end int) {
901         for i := begin; i < end; i++ {
902                 t.updatePiecePriority(i)
903         }
904 }
905
906 // Returns the range of pieces [begin, end) that contains the extent of bytes.
907 func (t *Torrent) byteRegionPieces(off, size int64) (begin, end int) {
908         if off >= t.length {
909                 return
910         }
911         if off < 0 {
912                 size += off
913                 off = 0
914         }
915         if size <= 0 {
916                 return
917         }
918         begin = int(off / t.info.PieceLength)
919         end = int((off + size + t.info.PieceLength - 1) / t.info.PieceLength)
920         if end > t.info.NumPieces() {
921                 end = t.info.NumPieces()
922         }
923         return
924 }
925
926 // Returns true if all iterations complete without breaking. Returns the read
927 // regions for all readers. The reader regions should not be merged as some
928 // callers depend on this method to enumerate readers.
929 func (t *Torrent) forReaderOffsetPieces(f func(begin, end int) (more bool)) (all bool) {
930         for r := range t.readers {
931                 p := r.pieces
932                 if p.begin >= p.end {
933                         continue
934                 }
935                 if !f(p.begin, p.end) {
936                         return false
937                 }
938         }
939         return true
940 }
941
942 func (t *Torrent) piecePriority(piece int) piecePriority {
943         if !t.haveInfo() {
944                 return PiecePriorityNone
945         }
946         return t.pieces[piece].priority
947 }
948
949 func (t *Torrent) piecePriorityUncached(piece int) piecePriority {
950         if t.pieceComplete(piece) {
951                 return PiecePriorityNone
952         }
953         if t.readerNowPieces.Contains(piece) {
954                 return PiecePriorityNow
955         }
956         // if t.readerNowPieces.Contains(piece - 1) {
957         //      return PiecePriorityNext
958         // }
959         if t.readerReadaheadPieces.Contains(piece) {
960                 return PiecePriorityReadahead
961         }
962         if t.pendingPieces.Contains(piece) {
963                 return PiecePriorityNormal
964         }
965         return PiecePriorityNone
966 }
967
968 func (t *Torrent) pendPiece(piece int) {
969         if t.pendingPieces.Contains(piece) {
970                 return
971         }
972         if t.havePiece(piece) {
973                 return
974         }
975         t.pendingPieces.Add(piece)
976         t.updatePiecePriority(piece)
977 }
978
979 func (t *Torrent) getCompletedPieces() (ret bitmap.Bitmap) {
980         return t.completedPieces.Copy()
981 }
982
983 func (t *Torrent) unpendPieces(unpend *bitmap.Bitmap) {
984         t.pendingPieces.Sub(unpend)
985         unpend.IterTyped(func(piece int) (again bool) {
986                 t.updatePiecePriority(piece)
987                 return true
988         })
989 }
990
991 func (t *Torrent) pendPieceRange(begin, end int) {
992         for i := begin; i < end; i++ {
993                 t.pendPiece(i)
994         }
995 }
996
997 func (t *Torrent) unpendPieceRange(begin, end int) {
998         var bm bitmap.Bitmap
999         bm.AddRange(begin, end)
1000         t.unpendPieces(&bm)
1001 }
1002
1003 func (t *Torrent) pendRequest(req request) {
1004         ci := chunkIndex(req.chunkSpec, t.chunkSize)
1005         t.pieces[req.Index].pendChunkIndex(ci)
1006 }
1007
1008 func (t *Torrent) pieceCompletionChanged(piece int) {
1009         t.cl.event.Broadcast()
1010         if t.pieceComplete(piece) {
1011                 t.onPieceCompleted(piece)
1012         } else {
1013                 t.onIncompletePiece(piece)
1014         }
1015         t.updatePiecePriority(piece)
1016 }
1017
1018 func (t *Torrent) openNewConns() {
1019         t.cl.openNewConns(t)
1020 }
1021
1022 func (t *Torrent) getConnPieceInclination() []int {
1023         _ret := t.connPieceInclinationPool.Get()
1024         if _ret == nil {
1025                 pieceInclinationsNew.Add(1)
1026                 return rand.Perm(t.numPieces())
1027         }
1028         pieceInclinationsReused.Add(1)
1029         return _ret.([]int)
1030 }
1031
1032 func (t *Torrent) putPieceInclination(pi []int) {
1033         t.connPieceInclinationPool.Put(pi)
1034         pieceInclinationsPut.Add(1)
1035 }
1036
1037 func (t *Torrent) updatePieceCompletion(piece int) {
1038         pcu := t.pieceCompleteUncached(piece)
1039         changed := t.completedPieces.Get(piece) != pcu
1040         t.completedPieces.Set(piece, pcu)
1041         if changed {
1042                 t.pieceCompletionChanged(piece)
1043         }
1044 }
1045
1046 // Non-blocking read. Client lock is not required.
1047 func (t *Torrent) readAt(b []byte, off int64) (n int, err error) {
1048         p := &t.pieces[off/t.info.PieceLength]
1049         p.waitNoPendingWrites()
1050         return p.Storage().ReadAt(b, off-p.Info().Offset())
1051 }
1052
1053 func (t *Torrent) updateAllPieceCompletions() {
1054         for i := range iter.N(t.numPieces()) {
1055                 t.updatePieceCompletion(i)
1056         }
1057 }
1058
1059 // Returns an error if the metadata was completed, but couldn't be set for
1060 // some reason. Blame it on the last peer to contribute.
1061 func (t *Torrent) maybeCompleteMetadata() error {
1062         if t.haveInfo() {
1063                 // Nothing to do.
1064                 return nil
1065         }
1066         if !t.haveAllMetadataPieces() {
1067                 // Don't have enough metadata pieces.
1068                 return nil
1069         }
1070         err := t.setInfoBytes(t.metadataBytes)
1071         if err != nil {
1072                 t.invalidateMetadata()
1073                 return fmt.Errorf("error setting info bytes: %s", err)
1074         }
1075         if t.cl.config.Debug {
1076                 log.Printf("%s: got metadata from peers", t)
1077         }
1078         return nil
1079 }
1080
1081 func (t *Torrent) readerPieces() (ret bitmap.Bitmap) {
1082         t.forReaderOffsetPieces(func(begin, end int) bool {
1083                 ret.AddRange(begin, end)
1084                 return true
1085         })
1086         return
1087 }
1088
1089 func (t *Torrent) readerPiecePriorities() (now, readahead bitmap.Bitmap) {
1090         t.forReaderOffsetPieces(func(begin, end int) bool {
1091                 if end > begin {
1092                         now.Add(begin)
1093                         readahead.AddRange(begin+1, end)
1094                 }
1095                 return true
1096         })
1097         return
1098 }
1099
1100 func (t *Torrent) needData() bool {
1101         if t.closed.IsSet() {
1102                 return false
1103         }
1104         if !t.haveInfo() {
1105                 return true
1106         }
1107         if t.pendingPieces.Len() != 0 {
1108                 return true
1109         }
1110         // Read as "not all complete".
1111         return !t.readerPieces().IterTyped(func(piece int) bool {
1112                 return t.pieceComplete(piece)
1113         })
1114 }
1115
1116 func appendMissingStrings(old, new []string) (ret []string) {
1117         ret = old
1118 new:
1119         for _, n := range new {
1120                 for _, o := range old {
1121                         if o == n {
1122                                 continue new
1123                         }
1124                 }
1125                 ret = append(ret, n)
1126         }
1127         return
1128 }
1129
1130 func appendMissingTrackerTiers(existing [][]string, minNumTiers int) (ret [][]string) {
1131         ret = existing
1132         for minNumTiers > len(ret) {
1133                 ret = append(ret, nil)
1134         }
1135         return
1136 }
1137
1138 func (t *Torrent) addTrackers(announceList [][]string) {
1139         fullAnnounceList := &t.metainfo.AnnounceList
1140         t.metainfo.AnnounceList = appendMissingTrackerTiers(*fullAnnounceList, len(announceList))
1141         for tierIndex, trackerURLs := range announceList {
1142                 (*fullAnnounceList)[tierIndex] = appendMissingStrings((*fullAnnounceList)[tierIndex], trackerURLs)
1143         }
1144         t.startMissingTrackerScrapers()
1145         t.updateWantPeersEvent()
1146 }
1147
1148 // Don't call this before the info is available.
1149 func (t *Torrent) bytesCompleted() int64 {
1150         if !t.haveInfo() {
1151                 return 0
1152         }
1153         return t.info.TotalLength() - t.bytesLeft()
1154 }
1155
1156 func (t *Torrent) SetInfoBytes(b []byte) (err error) {
1157         t.cl.mu.Lock()
1158         defer t.cl.mu.Unlock()
1159         return t.setInfoBytes(b)
1160 }
1161
1162 // Returns true if connection is removed from torrent.Conns.
1163 func (t *Torrent) deleteConnection(c *connection) (ret bool) {
1164         _, ret = t.conns[c]
1165         delete(t.conns, c)
1166         return
1167 }
1168
1169 func (t *Torrent) dropConnection(c *connection) {
1170         t.cl.event.Broadcast()
1171         c.Close()
1172         if t.deleteConnection(c) {
1173                 t.openNewConns()
1174         }
1175 }
1176
1177 func (t *Torrent) wantPeers() bool {
1178         if t.closed.IsSet() {
1179                 return false
1180         }
1181         if len(t.peers) > torrentPeersLowWater {
1182                 return false
1183         }
1184         return t.needData() || t.seeding()
1185 }
1186
1187 func (t *Torrent) updateWantPeersEvent() {
1188         if t.wantPeers() {
1189                 t.wantPeersEvent.Set()
1190         } else {
1191                 t.wantPeersEvent.Clear()
1192         }
1193 }
1194
1195 // Returns whether the client should make effort to seed the torrent.
1196 func (t *Torrent) seeding() bool {
1197         cl := t.cl
1198         if t.closed.IsSet() {
1199                 return false
1200         }
1201         if cl.config.NoUpload {
1202                 return false
1203         }
1204         if !cl.config.Seed {
1205                 return false
1206         }
1207         if t.needData() {
1208                 return false
1209         }
1210         return true
1211 }
1212
1213 // Adds and starts tracker scrapers for tracker URLs that aren't already
1214 // running.
1215 func (t *Torrent) startMissingTrackerScrapers() {
1216         if t.cl.config.DisableTrackers {
1217                 return
1218         }
1219         for _, tier := range t.announceList() {
1220                 for _, trackerURL := range tier {
1221                         if _, ok := t.trackerAnnouncers[trackerURL]; ok {
1222                                 continue
1223                         }
1224                         newAnnouncer := &trackerScraper{
1225                                 url: trackerURL,
1226                                 t:   t,
1227                         }
1228                         if t.trackerAnnouncers == nil {
1229                                 t.trackerAnnouncers = make(map[string]*trackerScraper)
1230                         }
1231                         t.trackerAnnouncers[trackerURL] = newAnnouncer
1232                         go newAnnouncer.Run()
1233                 }
1234         }
1235 }
1236
1237 // Returns an AnnounceRequest with fields filled out to defaults and current
1238 // values.
1239 func (t *Torrent) announceRequest() tracker.AnnounceRequest {
1240         return tracker.AnnounceRequest{
1241                 Event:    tracker.None,
1242                 NumWant:  -1,
1243                 Port:     uint16(t.cl.incomingPeerPort()),
1244                 PeerId:   t.cl.peerID,
1245                 InfoHash: t.infoHash,
1246                 Left:     t.bytesLeftAnnounce(),
1247         }
1248 }
1249
1250 // Adds peers revealed in an announce until the announce ends, or we have
1251 // enough peers.
1252 func (t *Torrent) consumeDHTAnnounce(pvs <-chan dht.PeersValues) {
1253         cl := t.cl
1254         // Count all the unique addresses we got during this announce.
1255         allAddrs := make(map[string]struct{})
1256         for {
1257                 select {
1258                 case v, ok := <-pvs:
1259                         if !ok {
1260                                 return
1261                         }
1262                         addPeers := make([]Peer, 0, len(v.Peers))
1263                         for _, cp := range v.Peers {
1264                                 if cp.Port == 0 {
1265                                         // Can't do anything with this.
1266                                         continue
1267                                 }
1268                                 addPeers = append(addPeers, Peer{
1269                                         IP:     cp.IP[:],
1270                                         Port:   cp.Port,
1271                                         Source: peerSourceDHT,
1272                                 })
1273                                 key := (&net.UDPAddr{
1274                                         IP:   cp.IP[:],
1275                                         Port: cp.Port,
1276                                 }).String()
1277                                 allAddrs[key] = struct{}{}
1278                         }
1279                         cl.mu.Lock()
1280                         t.addPeers(addPeers)
1281                         numPeers := len(t.peers)
1282                         cl.mu.Unlock()
1283                         if numPeers >= torrentPeersHighWater {
1284                                 return
1285                         }
1286                 case <-t.closed.LockedChan(&cl.mu):
1287                         return
1288                 }
1289         }
1290 }
1291
1292 func (t *Torrent) announceDHT(impliedPort bool) (err error) {
1293         cl := t.cl
1294         ps, err := cl.dHT.Announce(string(t.infoHash[:]), cl.incomingPeerPort(), impliedPort)
1295         if err != nil {
1296                 return
1297         }
1298         t.consumeDHTAnnounce(ps.Peers)
1299         ps.Close()
1300         return
1301 }
1302
1303 func (t *Torrent) dhtAnnouncer() {
1304         cl := t.cl
1305         for {
1306                 select {
1307                 case <-t.wantPeersEvent.LockedChan(&cl.mu):
1308                 case <-t.closed.LockedChan(&cl.mu):
1309                         return
1310                 }
1311                 err := t.announceDHT(true)
1312                 if err == nil {
1313                         cl.mu.Lock()
1314                         t.numDHTAnnounces++
1315                         cl.mu.Unlock()
1316                 } else {
1317                         log.Printf("error announcing %q to DHT: %s", t, err)
1318                 }
1319                 select {
1320                 case <-t.closed.LockedChan(&cl.mu):
1321                         return
1322                 case <-time.After(5 * time.Minute):
1323                 }
1324         }
1325 }
1326
1327 func (t *Torrent) addPeers(peers []Peer) {
1328         for _, p := range peers {
1329                 if t.cl.badPeerIPPort(p.IP, p.Port) {
1330                         continue
1331                 }
1332                 t.addPeer(p)
1333         }
1334 }
1335
1336 func (t *Torrent) Stats() TorrentStats {
1337         t.cl.mu.Lock()
1338         defer t.cl.mu.Unlock()
1339         return t.stats
1340 }
1341
1342 // Returns true if the connection is added.
1343 func (t *Torrent) addConnection(c *connection) bool {
1344         if t.cl.closed.IsSet() {
1345                 return false
1346         }
1347         if !t.wantConns() {
1348                 return false
1349         }
1350         for c0 := range t.conns {
1351                 if c.PeerID == c0.PeerID {
1352                         // Already connected to a client with that ID.
1353                         duplicateClientConns.Add(1)
1354                         return false
1355                 }
1356         }
1357         if len(t.conns) >= t.maxEstablishedConns {
1358                 c := t.worstBadConn()
1359                 if c == nil {
1360                         return false
1361                 }
1362                 if t.cl.config.Debug && missinggo.CryHeard() {
1363                         log.Printf("%s: dropping connection to make room for new one:\n    %s", t, c)
1364                 }
1365                 c.Close()
1366                 t.deleteConnection(c)
1367         }
1368         if len(t.conns) >= t.maxEstablishedConns {
1369                 panic(len(t.conns))
1370         }
1371         if c.t != nil {
1372                 panic("connection already associated with a torrent")
1373         }
1374         // Reconcile bytes transferred before connection was associated with a
1375         // torrent.
1376         t.stats.wroteBytes(c.stats.BytesWritten)
1377         t.stats.readBytes(c.stats.BytesRead)
1378         c.t = t
1379         t.conns[c] = struct{}{}
1380         return true
1381 }
1382
1383 func (t *Torrent) wantConns() bool {
1384         if t.closed.IsSet() {
1385                 return false
1386         }
1387         if !t.seeding() && !t.needData() {
1388                 return false
1389         }
1390         if len(t.conns) < t.maxEstablishedConns {
1391                 return true
1392         }
1393         return t.worstBadConn() != nil
1394 }
1395
1396 func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
1397         t.cl.mu.Lock()
1398         defer t.cl.mu.Unlock()
1399         oldMax = t.maxEstablishedConns
1400         t.maxEstablishedConns = max
1401         wcs := slices.HeapInterface(slices.FromMapKeys(t.conns), worseConn)
1402         for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 {
1403                 t.dropConnection(wcs.Pop().(*connection))
1404         }
1405         t.openNewConns()
1406         return oldMax
1407 }
1408
1409 func (t *Torrent) mu() missinggo.RWLocker {
1410         return &t.cl.mu
1411 }
1412
1413 func (t *Torrent) pieceHashed(piece int, correct bool) {
1414         if t.closed.IsSet() {
1415                 return
1416         }
1417         p := &t.pieces[piece]
1418         touchers := t.reapPieceTouchers(piece)
1419         if p.EverHashed {
1420                 // Don't score the first time a piece is hashed, it could be an
1421                 // initial check.
1422                 if correct {
1423                         pieceHashedCorrect.Add(1)
1424                 } else {
1425                         log.Printf("%s: piece %d (%x) failed hash: %d connections contributed", t, piece, p.Hash, len(touchers))
1426                         pieceHashedNotCorrect.Add(1)
1427                 }
1428         }
1429         p.EverHashed = true
1430         if correct {
1431                 for _, c := range touchers {
1432                         c.goodPiecesDirtied++
1433                 }
1434                 err := p.Storage().MarkComplete()
1435                 if err != nil {
1436                         log.Printf("%T: error completing piece %d: %s", t.storage, piece, err)
1437                 }
1438                 t.updatePieceCompletion(piece)
1439         } else if len(touchers) != 0 {
1440                 for _, c := range touchers {
1441                         // Y u do dis peer?!
1442                         c.badPiecesDirtied++
1443                 }
1444                 slices.Sort(touchers, connLessTrusted)
1445                 log.Printf("dropping first corresponding conn from trust: %s", func() (ret []int) {
1446                         for _, c := range touchers {
1447                                 ret = append(ret, c.netGoodPiecesDirtied())
1448                         }
1449                         return
1450                 })
1451                 c := touchers[0]
1452                 t.cl.banPeerIP(missinggo.AddrIP(c.remoteAddr()))
1453                 c.Drop()
1454         }
1455 }
1456
1457 func (t *Torrent) onPieceCompleted(piece int) {
1458         t.pendingPieces.Remove(piece)
1459         t.pendAllChunkSpecs(piece)
1460         for conn := range t.conns {
1461                 conn.Have(piece)
1462                 for r := range conn.Requests {
1463                         if int(r.Index) == piece {
1464                                 conn.Cancel(r)
1465                         }
1466                 }
1467                 // Could check here if peer doesn't have piece, but due to caching
1468                 // some peers may have said they have a piece but they don't.
1469                 conn.upload()
1470         }
1471 }
1472
1473 func (t *Torrent) onIncompletePiece(piece int) {
1474         if t.pieceAllDirty(piece) {
1475                 t.pendAllChunkSpecs(piece)
1476         }
1477         if !t.wantPieceIndex(piece) {
1478                 return
1479         }
1480         // We could drop any connections that we told we have a piece that we
1481         // don't here. But there's a test failure, and it seems clients don't care
1482         // if you request pieces that you already claim to have. Pruning bad
1483         // connections might just remove any connections that aren't treating us
1484         // favourably anyway.
1485         // for c := range t.conns {
1486         //      if c.sentHave(piece) {
1487         //              c.Drop()
1488         //      }
1489         // }
1490         for conn := range t.conns {
1491                 if conn.PeerHasPiece(piece) {
1492                         conn.updateRequests()
1493                 }
1494         }
1495 }
1496
1497 func (t *Torrent) verifyPiece(piece int) {
1498         cl := t.cl
1499         cl.mu.Lock()
1500         defer cl.mu.Unlock()
1501         p := &t.pieces[piece]
1502         for p.Hashing || t.storage == nil {
1503                 cl.event.Wait()
1504         }
1505         p.QueuedForHash = false
1506         if t.closed.IsSet() || t.pieceComplete(piece) {
1507                 t.updatePiecePriority(piece)
1508                 return
1509         }
1510         p.Hashing = true
1511         t.publishPieceChange(piece)
1512         cl.mu.Unlock()
1513         sum := t.hashPiece(piece)
1514         cl.mu.Lock()
1515         p.Hashing = false
1516         t.pieceHashed(piece, sum == p.Hash)
1517 }
1518
1519 // Return the connections that touched a piece, and clear the entry while
1520 // doing it.
1521 func (t *Torrent) reapPieceTouchers(piece int) (ret []*connection) {
1522         for c := range t.conns {
1523                 if _, ok := c.peerTouchedPieces[piece]; ok {
1524                         ret = append(ret, c)
1525                         delete(c.peerTouchedPieces, piece)
1526                 }
1527         }
1528         return
1529 }
1530
1531 func (t *Torrent) connsAsSlice() (ret []*connection) {
1532         for c := range t.conns {
1533                 ret = append(ret, c)
1534         }
1535         return
1536 }