]> Sergey Matveev's repositories - btrtrc.git/blob - torrent.go
More development of the new logging interface
[btrtrc.git] / torrent.go
1 package torrent
2
3 import (
4         "container/heap"
5         "crypto/sha1"
6         "errors"
7         "fmt"
8         "io"
9         "math"
10         "math/rand"
11         "net"
12         "os"
13         "strconv"
14         "sync"
15         "text/tabwriter"
16         "time"
17
18         "github.com/anacrolix/log"
19
20         "github.com/davecgh/go-spew/spew"
21
22         "github.com/anacrolix/dht"
23         "github.com/anacrolix/missinggo"
24         "github.com/anacrolix/missinggo/bitmap"
25         "github.com/anacrolix/missinggo/perf"
26         "github.com/anacrolix/missinggo/prioritybitmap"
27         "github.com/anacrolix/missinggo/pubsub"
28         "github.com/anacrolix/missinggo/slices"
29         "github.com/bradfitz/iter"
30
31         "github.com/anacrolix/torrent/bencode"
32         "github.com/anacrolix/torrent/metainfo"
33         pp "github.com/anacrolix/torrent/peer_protocol"
34         "github.com/anacrolix/torrent/storage"
35         "github.com/anacrolix/torrent/tracker"
36 )
37
38 func (t *Torrent) chunkIndexSpec(chunkIndex, piece int) chunkSpec {
39         return chunkIndexSpec(chunkIndex, t.pieceLength(piece), t.chunkSize)
40 }
41
42 type peersKey struct {
43         IPBytes string
44         Port    int
45 }
46
47 // Maintains state of torrent within a Client.
48 type Torrent struct {
49         cl     *Client
50         logger *log.Logger
51
52         networkingEnabled bool
53         // Determines what chunks to request from peers. 1: Favour higher priority
54         // pieces with some fuzzing to reduce overlaps and wastage across
55         // connections. 2: The fastest connection downloads strictly in order of
56         // priority, while all others adher to their piece inclications.
57         requestStrategy int
58
59         closed   missinggo.Event
60         infoHash metainfo.Hash
61         pieces   []Piece
62         // Values are the piece indices that changed.
63         pieceStateChanges *pubsub.PubSub
64         // The size of chunks to request from peers over the wire. This is
65         // normally 16KiB by convention these days.
66         chunkSize pp.Integer
67         chunkPool *sync.Pool
68         // Total length of the torrent in bytes. Stored because it's not O(1) to
69         // get this from the info dict.
70         length *int64
71
72         // The storage to open when the info dict becomes available.
73         storageOpener *storage.Client
74         // Storage for torrent data.
75         storage *storage.Torrent
76         // Read-locked for using storage, and write-locked for Closing.
77         storageLock sync.RWMutex
78
79         metainfo metainfo.MetaInfo
80
81         // The info dict. nil if we don't have it (yet).
82         info  *metainfo.Info
83         files *[]*File
84
85         // Active peer connections, running message stream loops.
86         conns               map[*connection]struct{}
87         maxEstablishedConns int
88         // Set of addrs to which we're attempting to connect. Connections are
89         // half-open until all handshakes are completed.
90         halfOpen    map[string]Peer
91         fastestConn *connection
92
93         // Reserve of peers to connect to. A peer can be both here and in the
94         // active connections if were told about the peer after connecting with
95         // them. That encourages us to reconnect to peers that are well known in
96         // the swarm.
97         peers          map[peersKey]Peer
98         wantPeersEvent missinggo.Event
99         // An announcer for each tracker URL.
100         trackerAnnouncers map[string]*trackerScraper
101         // How many times we've initiated a DHT announce. TODO: Move into stats.
102         numDHTAnnounces int
103
104         // Name used if the info name isn't available. Should be cleared when the
105         // Info does become available.
106         displayName string
107
108         // The bencoded bytes of the info dict. This is actively manipulated if
109         // the info bytes aren't initially available, and we try to fetch them
110         // from peers.
111         metadataBytes []byte
112         // Each element corresponds to the 16KiB metadata pieces. If true, we have
113         // received that piece.
114         metadataCompletedChunks []bool
115         metadataChanged         sync.Cond
116
117         // Set when .Info is obtained.
118         gotMetainfo missinggo.Event
119
120         readers               map[*reader]struct{}
121         readerNowPieces       bitmap.Bitmap
122         readerReadaheadPieces bitmap.Bitmap
123
124         // A cache of pieces we need to get. Calculated from various piece and
125         // file priorities and completion states elsewhere.
126         pendingPieces prioritybitmap.PriorityBitmap
127         // A cache of completed piece indices.
128         completedPieces bitmap.Bitmap
129         // Pieces that need to be hashed.
130         piecesQueuedForHash bitmap.Bitmap
131
132         // A pool of piece priorities []int for assignment to new connections.
133         // These "inclinations" are used to give connections preference for
134         // different pieces.
135         connPieceInclinationPool sync.Pool
136         // Torrent-level statistics.
137         stats TorrentStats
138 }
139
140 // Returns a channel that is closed when the Torrent is closed.
141 func (t *Torrent) Closed() <-chan struct{} {
142         return t.closed.LockedChan(&t.cl.mu)
143 }
144
145 // KnownSwarm returns the known subset of the peers in the Torrent's swarm, including active,
146 // pending, and half-open peers.
147 func (t *Torrent) KnownSwarm() (ks []Peer) {
148         // Add pending peers to the list
149         for _, peer := range t.peers {
150                 ks = append(ks, peer)
151         }
152
153         // Add half-open peers to the list
154         for _, peer := range t.halfOpen {
155                 ks = append(ks, peer)
156         }
157
158         // Add active peers to the list
159         for conn := range t.conns {
160                 host, portString, err := net.SplitHostPort(conn.remoteAddr().String())
161                 if err != nil {
162                         panic(err)
163                 }
164
165                 ip := net.ParseIP(host)
166                 port, err := strconv.Atoi(portString)
167                 if err != nil {
168                         panic(err)
169                 }
170
171                 ks = append(ks, Peer{
172                         Id:     conn.PeerID,
173                         IP:     ip,
174                         Port:   port,
175                         Source: conn.Discovery,
176                         // > If the connection is encrypted, that's certainly enough to set SupportsEncryption.
177                         // > But if we're not connected to them with an encrypted connection, I couldn't say
178                         // > what's appropriate. We can carry forward the SupportsEncryption value as we
179                         // > received it from trackers/DHT/PEX, or just use the encryption state for the
180                         // > connection. It's probably easiest to do the latter for now.
181                         // https://github.com/anacrolix/torrent/pull/188
182                         SupportsEncryption: conn.headerEncrypted,
183                 })
184         }
185
186         return
187 }
188
189 func (t *Torrent) setChunkSize(size pp.Integer) {
190         t.chunkSize = size
191         t.chunkPool = &sync.Pool{
192                 New: func() interface{} {
193                         b := make([]byte, size)
194                         return &b
195                 },
196         }
197 }
198
199 func (t *Torrent) setDisplayName(dn string) {
200         if t.haveInfo() {
201                 return
202         }
203         t.displayName = dn
204 }
205
206 func (t *Torrent) pieceComplete(piece int) bool {
207         return t.completedPieces.Get(piece)
208 }
209
210 func (t *Torrent) pieceCompleteUncached(piece int) storage.Completion {
211         return t.pieces[piece].Storage().Completion()
212 }
213
214 // There's a connection to that address already.
215 func (t *Torrent) addrActive(addr string) bool {
216         if _, ok := t.halfOpen[addr]; ok {
217                 return true
218         }
219         for c := range t.conns {
220                 ra := c.remoteAddr()
221                 if ra == nil {
222                         continue
223                 }
224                 if ra.String() == addr {
225                         return true
226                 }
227         }
228         return false
229 }
230
231 func (t *Torrent) unclosedConnsAsSlice() (ret []*connection) {
232         ret = make([]*connection, 0, len(t.conns))
233         for c := range t.conns {
234                 if !c.closed.IsSet() {
235                         ret = append(ret, c)
236                 }
237         }
238         return
239 }
240
241 func (t *Torrent) addPeer(p Peer) {
242         cl := t.cl
243         cl.openNewConns(t)
244         if len(t.peers) >= cl.config.TorrentPeersHighWater {
245                 return
246         }
247         key := peersKey{string(p.IP), p.Port}
248         if _, ok := t.peers[key]; ok {
249                 return
250         }
251         t.peers[key] = p
252         peersAddedBySource.Add(string(p.Source), 1)
253         cl.openNewConns(t)
254
255 }
256
257 func (t *Torrent) invalidateMetadata() {
258         for i := range t.metadataCompletedChunks {
259                 t.metadataCompletedChunks[i] = false
260         }
261         t.info = nil
262 }
263
264 func (t *Torrent) saveMetadataPiece(index int, data []byte) {
265         if t.haveInfo() {
266                 return
267         }
268         if index >= len(t.metadataCompletedChunks) {
269                 log.Printf("%s: ignoring metadata piece %d", t, index)
270                 return
271         }
272         copy(t.metadataBytes[(1<<14)*index:], data)
273         t.metadataCompletedChunks[index] = true
274 }
275
276 func (t *Torrent) metadataPieceCount() int {
277         return (len(t.metadataBytes) + (1 << 14) - 1) / (1 << 14)
278 }
279
280 func (t *Torrent) haveMetadataPiece(piece int) bool {
281         if t.haveInfo() {
282                 return (1<<14)*piece < len(t.metadataBytes)
283         } else {
284                 return piece < len(t.metadataCompletedChunks) && t.metadataCompletedChunks[piece]
285         }
286 }
287
288 func (t *Torrent) metadataSizeKnown() bool {
289         return t.metadataBytes != nil
290 }
291
292 func (t *Torrent) metadataSize() int {
293         return len(t.metadataBytes)
294 }
295
296 func infoPieceHashes(info *metainfo.Info) (ret []string) {
297         for i := 0; i < len(info.Pieces); i += sha1.Size {
298                 ret = append(ret, string(info.Pieces[i:i+sha1.Size]))
299         }
300         return
301 }
302
303 func (t *Torrent) makePieces() {
304         hashes := infoPieceHashes(t.info)
305         t.pieces = make([]Piece, len(hashes))
306         for i, hash := range hashes {
307                 piece := &t.pieces[i]
308                 piece.t = t
309                 piece.index = i
310                 piece.noPendingWrites.L = &piece.pendingWritesMutex
311                 missinggo.CopyExact(piece.hash[:], hash)
312                 files := *t.files
313                 beginFile := pieceFirstFileIndex(piece.torrentBeginOffset(), files)
314                 endFile := pieceEndFileIndex(piece.torrentEndOffset(), files)
315                 piece.files = files[beginFile:endFile]
316         }
317 }
318
319 // Returns the index of the first file containing the piece. files must be
320 // ordered by offset.
321 func pieceFirstFileIndex(pieceOffset int64, files []*File) int {
322         for i, f := range files {
323                 if f.offset+f.length > pieceOffset {
324                         return i
325                 }
326         }
327         return 0
328 }
329
330 // Returns the index after the last file containing the piece. files must be
331 // ordered by offset.
332 func pieceEndFileIndex(pieceEndOffset int64, files []*File) int {
333         for i, f := range files {
334                 if f.offset+f.length >= pieceEndOffset {
335                         return i + 1
336                 }
337         }
338         return 0
339 }
340
341 func (t *Torrent) cacheLength() {
342         var l int64
343         for _, f := range t.info.UpvertedFiles() {
344                 l += f.Length
345         }
346         t.length = &l
347 }
348
349 func (t *Torrent) setInfo(info *metainfo.Info) error {
350         if err := validateInfo(info); err != nil {
351                 return fmt.Errorf("bad info: %s", err)
352         }
353         if t.storageOpener != nil {
354                 var err error
355                 t.storage, err = t.storageOpener.OpenTorrent(info, t.infoHash)
356                 if err != nil {
357                         return fmt.Errorf("error opening torrent storage: %s", err)
358                 }
359         }
360         t.info = info
361         t.displayName = "" // Save a few bytes lol.
362         t.initFiles()
363         t.cacheLength()
364         t.makePieces()
365         return nil
366 }
367
368 func (t *Torrent) onSetInfo() {
369         for conn := range t.conns {
370                 if err := conn.setNumPieces(t.numPieces()); err != nil {
371                         log.Printf("closing connection: %s", err)
372                         conn.Close()
373                 }
374         }
375         for i := range t.pieces {
376                 t.updatePieceCompletion(i)
377                 p := &t.pieces[i]
378                 if !p.storageCompletionOk {
379                         // log.Printf("piece %s completion unknown, queueing check", p)
380                         t.queuePieceCheck(i)
381                 }
382         }
383         t.cl.event.Broadcast()
384         t.gotMetainfo.Set()
385         t.updateWantPeersEvent()
386 }
387
388 // Called when metadata for a torrent becomes available.
389 func (t *Torrent) setInfoBytes(b []byte) error {
390         if metainfo.HashBytes(b) != t.infoHash {
391                 return errors.New("info bytes have wrong hash")
392         }
393         var info metainfo.Info
394         if err := bencode.Unmarshal(b, &info); err != nil {
395                 return fmt.Errorf("error unmarshalling info bytes: %s", err)
396         }
397         if err := t.setInfo(&info); err != nil {
398                 return err
399         }
400         t.metadataBytes = b
401         t.metadataCompletedChunks = nil
402         t.onSetInfo()
403         return nil
404 }
405
406 func (t *Torrent) haveAllMetadataPieces() bool {
407         if t.haveInfo() {
408                 return true
409         }
410         if t.metadataCompletedChunks == nil {
411                 return false
412         }
413         for _, have := range t.metadataCompletedChunks {
414                 if !have {
415                         return false
416                 }
417         }
418         return true
419 }
420
421 // TODO: Propagate errors to disconnect peer.
422 func (t *Torrent) setMetadataSize(bytes int64) (err error) {
423         if t.haveInfo() {
424                 // We already know the correct metadata size.
425                 return
426         }
427         if bytes <= 0 || bytes > 10000000 { // 10MB, pulled from my ass.
428                 return errors.New("bad size")
429         }
430         if t.metadataBytes != nil && len(t.metadataBytes) == int(bytes) {
431                 return
432         }
433         t.metadataBytes = make([]byte, bytes)
434         t.metadataCompletedChunks = make([]bool, (bytes+(1<<14)-1)/(1<<14))
435         t.metadataChanged.Broadcast()
436         for c := range t.conns {
437                 c.requestPendingMetadata()
438         }
439         return
440 }
441
442 // The current working name for the torrent. Either the name in the info dict,
443 // or a display name given such as by the dn value in a magnet link, or "".
444 func (t *Torrent) name() string {
445         if t.haveInfo() {
446                 return t.info.Name
447         }
448         return t.displayName
449 }
450
451 func (t *Torrent) pieceState(index int) (ret PieceState) {
452         p := &t.pieces[index]
453         ret.Priority = t.piecePriority(index)
454         ret.Completion = p.completion()
455         if p.queuedForHash() || p.hashing {
456                 ret.Checking = true
457         }
458         if !ret.Complete && t.piecePartiallyDownloaded(index) {
459                 ret.Partial = true
460         }
461         return
462 }
463
464 func (t *Torrent) metadataPieceSize(piece int) int {
465         return metadataPieceSize(len(t.metadataBytes), piece)
466 }
467
468 func (t *Torrent) newMetadataExtensionMessage(c *connection, msgType int, piece int, data []byte) pp.Message {
469         d := map[string]int{
470                 "msg_type": msgType,
471                 "piece":    piece,
472         }
473         if data != nil {
474                 d["total_size"] = len(t.metadataBytes)
475         }
476         p, err := bencode.Marshal(d)
477         if err != nil {
478                 panic(err)
479         }
480         return pp.Message{
481                 Type:            pp.Extended,
482                 ExtendedID:      c.PeerExtensionIDs["ut_metadata"],
483                 ExtendedPayload: append(p, data...),
484         }
485 }
486
487 func (t *Torrent) pieceStateRuns() (ret []PieceStateRun) {
488         rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
489                 ret = append(ret, PieceStateRun{
490                         PieceState: el.(PieceState),
491                         Length:     int(count),
492                 })
493         })
494         for index := range t.pieces {
495                 rle.Append(t.pieceState(index), 1)
496         }
497         rle.Flush()
498         return
499 }
500
501 // Produces a small string representing a PieceStateRun.
502 func pieceStateRunStatusChars(psr PieceStateRun) (ret string) {
503         ret = fmt.Sprintf("%d", psr.Length)
504         ret += func() string {
505                 switch psr.Priority {
506                 case PiecePriorityNext:
507                         return "N"
508                 case PiecePriorityNormal:
509                         return "."
510                 case PiecePriorityReadahead:
511                         return "R"
512                 case PiecePriorityNow:
513                         return "!"
514                 case PiecePriorityHigh:
515                         return "H"
516                 default:
517                         return ""
518                 }
519         }()
520         if psr.Checking {
521                 ret += "H"
522         }
523         if psr.Partial {
524                 ret += "P"
525         }
526         if psr.Complete {
527                 ret += "C"
528         }
529         if !psr.Ok {
530                 ret += "?"
531         }
532         return
533 }
534
535 func (t *Torrent) writeStatus(w io.Writer) {
536         fmt.Fprintf(w, "Infohash: %s\n", t.infoHash.HexString())
537         fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
538         if !t.haveInfo() {
539                 fmt.Fprintf(w, "Metadata have: ")
540                 for _, h := range t.metadataCompletedChunks {
541                         fmt.Fprintf(w, "%c", func() rune {
542                                 if h {
543                                         return 'H'
544                                 } else {
545                                         return '.'
546                                 }
547                         }())
548                 }
549                 fmt.Fprintln(w)
550         }
551         fmt.Fprintf(w, "Piece length: %s\n", func() string {
552                 if t.haveInfo() {
553                         return fmt.Sprint(t.usualPieceSize())
554                 } else {
555                         return "?"
556                 }
557         }())
558         if t.info != nil {
559                 fmt.Fprintf(w, "Num Pieces: %d\n", t.numPieces())
560                 fmt.Fprint(w, "Piece States:")
561                 for _, psr := range t.pieceStateRuns() {
562                         w.Write([]byte(" "))
563                         w.Write([]byte(pieceStateRunStatusChars(psr)))
564                 }
565                 fmt.Fprintln(w)
566         }
567         fmt.Fprintf(w, "Reader Pieces:")
568         t.forReaderOffsetPieces(func(begin, end int) (again bool) {
569                 fmt.Fprintf(w, " %d:%d", begin, end)
570                 return true
571         })
572         fmt.Fprintln(w)
573
574         fmt.Fprintf(w, "Trackers:\n")
575         func() {
576                 tw := tabwriter.NewWriter(w, 0, 0, 2, ' ', 0)
577                 fmt.Fprintf(tw, "    URL\tNext announce\tLast announce\n")
578                 for _, ta := range slices.Sort(slices.FromMapElems(t.trackerAnnouncers), func(l, r *trackerScraper) bool {
579                         return l.url < r.url
580                 }).([]*trackerScraper) {
581                         fmt.Fprintf(tw, "    %s\n", ta.statusLine())
582                 }
583                 tw.Flush()
584         }()
585
586         fmt.Fprintf(w, "DHT Announces: %d\n", t.numDHTAnnounces)
587
588         spew.NewDefaultConfig()
589         spew.Fdump(w, t.statsLocked())
590
591         conns := t.connsAsSlice()
592         slices.Sort(conns, worseConn)
593         for i, c := range conns {
594                 fmt.Fprintf(w, "%2d. ", i+1)
595                 c.WriteStatus(w, t)
596         }
597 }
598
599 func (t *Torrent) haveInfo() bool {
600         return t.info != nil
601 }
602
603 // Returns a run-time generated MetaInfo that includes the info bytes and
604 // announce-list as currently known to the client.
605 func (t *Torrent) newMetaInfo() metainfo.MetaInfo {
606         return metainfo.MetaInfo{
607                 CreationDate: time.Now().Unix(),
608                 Comment:      "dynamic metainfo from client",
609                 CreatedBy:    "go.torrent",
610                 AnnounceList: t.metainfo.UpvertedAnnounceList(),
611                 InfoBytes: func() []byte {
612                         if t.haveInfo() {
613                                 return t.metadataBytes
614                         } else {
615                                 return nil
616                         }
617                 }(),
618         }
619 }
620
621 func (t *Torrent) BytesMissing() int64 {
622         t.mu().RLock()
623         defer t.mu().RUnlock()
624         return t.bytesMissingLocked()
625 }
626
627 func (t *Torrent) bytesMissingLocked() int64 {
628         return t.bytesLeft()
629 }
630
631 func (t *Torrent) bytesLeft() (left int64) {
632         bitmap.Flip(t.completedPieces, 0, t.numPieces()).IterTyped(func(piece int) bool {
633                 p := &t.pieces[piece]
634                 left += int64(p.length() - p.numDirtyBytes())
635                 return true
636         })
637         return
638 }
639
640 // Bytes left to give in tracker announces.
641 func (t *Torrent) bytesLeftAnnounce() uint64 {
642         if t.haveInfo() {
643                 return uint64(t.bytesLeft())
644         } else {
645                 return math.MaxUint64
646         }
647 }
648
649 func (t *Torrent) piecePartiallyDownloaded(piece int) bool {
650         if t.pieceComplete(piece) {
651                 return false
652         }
653         if t.pieceAllDirty(piece) {
654                 return false
655         }
656         return t.pieces[piece].hasDirtyChunks()
657 }
658
659 func (t *Torrent) usualPieceSize() int {
660         return int(t.info.PieceLength)
661 }
662
663 func (t *Torrent) numPieces() int {
664         return t.info.NumPieces()
665 }
666
667 func (t *Torrent) numPiecesCompleted() (num int) {
668         return t.completedPieces.Len()
669 }
670
671 func (t *Torrent) close() (err error) {
672         t.closed.Set()
673         if t.storage != nil {
674                 t.storageLock.Lock()
675                 t.storage.Close()
676                 t.storageLock.Unlock()
677         }
678         for conn := range t.conns {
679                 conn.Close()
680         }
681         t.cl.event.Broadcast()
682         t.pieceStateChanges.Close()
683         t.updateWantPeersEvent()
684         return
685 }
686
687 func (t *Torrent) requestOffset(r request) int64 {
688         return torrentRequestOffset(*t.length, int64(t.usualPieceSize()), r)
689 }
690
691 // Return the request that would include the given offset into the torrent
692 // data. Returns !ok if there is no such request.
693 func (t *Torrent) offsetRequest(off int64) (req request, ok bool) {
694         return torrentOffsetRequest(*t.length, t.info.PieceLength, int64(t.chunkSize), off)
695 }
696
697 func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
698         tr := perf.NewTimer()
699
700         n, err := t.pieces[piece].Storage().WriteAt(data, begin)
701         if err == nil && n != len(data) {
702                 err = io.ErrShortWrite
703         }
704         if err == nil {
705                 tr.Mark("write chunk")
706         }
707         return
708 }
709
710 func (t *Torrent) bitfield() (bf []bool) {
711         bf = make([]bool, t.numPieces())
712         t.completedPieces.IterTyped(func(piece int) (again bool) {
713                 bf[piece] = true
714                 return true
715         })
716         return
717 }
718
719 func (t *Torrent) pieceNumChunks(piece int) int {
720         return int((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize)
721 }
722
723 func (t *Torrent) pendAllChunkSpecs(pieceIndex int) {
724         t.pieces[pieceIndex].dirtyChunks.Clear()
725 }
726
727 type Peer struct {
728         Id     [20]byte
729         IP     net.IP
730         Port   int
731         Source peerSource
732         // Peer is known to support encryption.
733         SupportsEncryption bool
734 }
735
736 func (t *Torrent) pieceLength(piece int) pp.Integer {
737         if piece == t.numPieces()-1 {
738                 ret := pp.Integer(*t.length % t.info.PieceLength)
739                 if ret != 0 {
740                         return ret
741                 }
742         }
743         return pp.Integer(t.info.PieceLength)
744 }
745
746 func (t *Torrent) hashPiece(piece int) (ret metainfo.Hash) {
747         hash := pieceHash.New()
748         p := &t.pieces[piece]
749         p.waitNoPendingWrites()
750         ip := t.info.Piece(piece)
751         pl := ip.Length()
752         n, err := io.Copy(hash, io.NewSectionReader(t.pieces[piece].Storage(), 0, pl))
753         if n == pl {
754                 missinggo.CopyExact(&ret, hash.Sum(nil))
755                 return
756         }
757         if err != io.ErrUnexpectedEOF && !os.IsNotExist(err) {
758                 log.Printf("unexpected error hashing piece with %T: %s", t.storage.TorrentImpl, err)
759         }
760         return
761 }
762
763 func (t *Torrent) haveAnyPieces() bool {
764         for i := range t.pieces {
765                 if t.pieceComplete(i) {
766                         return true
767                 }
768         }
769         return false
770 }
771
772 func (t *Torrent) havePiece(index int) bool {
773         return t.haveInfo() && t.pieceComplete(index)
774 }
775
776 func (t *Torrent) haveChunk(r request) (ret bool) {
777         // defer func() {
778         //      log.Println("have chunk", r, ret)
779         // }()
780         if !t.haveInfo() {
781                 return false
782         }
783         if t.pieceComplete(int(r.Index)) {
784                 return true
785         }
786         p := &t.pieces[r.Index]
787         return !p.pendingChunk(r.chunkSpec, t.chunkSize)
788 }
789
790 func chunkIndex(cs chunkSpec, chunkSize pp.Integer) int {
791         return int(cs.Begin / chunkSize)
792 }
793
794 func (t *Torrent) wantPiece(r request) bool {
795         if !t.wantPieceIndex(int(r.Index)) {
796                 return false
797         }
798         if t.pieces[r.Index].pendingChunk(r.chunkSpec, t.chunkSize) {
799                 return true
800         }
801         // TODO: What about pieces that were wanted, but aren't now, and aren't
802         // completed either? That used to be done here.
803         return false
804 }
805
806 func (t *Torrent) wantPieceIndex(index int) bool {
807         if !t.haveInfo() {
808                 return false
809         }
810         if index < 0 || index >= t.numPieces() {
811                 return false
812         }
813         p := &t.pieces[index]
814         if p.queuedForHash() {
815                 return false
816         }
817         if p.hashing {
818                 return false
819         }
820         if t.pieceComplete(index) {
821                 return false
822         }
823         if t.pendingPieces.Contains(index) {
824                 return true
825         }
826         // log.Printf("piece %d not pending", index)
827         return !t.forReaderOffsetPieces(func(begin, end int) bool {
828                 return index < begin || index >= end
829         })
830 }
831
832 // The worst connection is one that hasn't been sent, or sent anything useful
833 // for the longest. A bad connection is one that usually sends us unwanted
834 // pieces, or has been in worser half of the established connections for more
835 // than a minute.
836 func (t *Torrent) worstBadConn() *connection {
837         wcs := worseConnSlice{t.unclosedConnsAsSlice()}
838         heap.Init(&wcs)
839         for wcs.Len() != 0 {
840                 c := heap.Pop(&wcs).(*connection)
841                 if c.UnwantedChunksReceived >= 6 && c.UnwantedChunksReceived > c.UsefulChunksReceived {
842                         return c
843                 }
844                 if wcs.Len() >= (t.maxEstablishedConns+1)/2 {
845                         // Give connections 1 minute to prove themselves.
846                         if time.Since(c.completedHandshake) > time.Minute {
847                                 return c
848                         }
849                 }
850         }
851         return nil
852 }
853
854 type PieceStateChange struct {
855         Index int
856         PieceState
857 }
858
859 func (t *Torrent) publishPieceChange(piece int) {
860         cur := t.pieceState(piece)
861         p := &t.pieces[piece]
862         if cur != p.publicPieceState {
863                 p.publicPieceState = cur
864                 t.pieceStateChanges.Publish(PieceStateChange{
865                         piece,
866                         cur,
867                 })
868         }
869 }
870
871 func (t *Torrent) pieceNumPendingChunks(piece int) int {
872         if t.pieceComplete(piece) {
873                 return 0
874         }
875         return t.pieceNumChunks(piece) - t.pieces[piece].numDirtyChunks()
876 }
877
878 func (t *Torrent) pieceAllDirty(piece int) bool {
879         return t.pieces[piece].dirtyChunks.Len() == t.pieceNumChunks(piece)
880 }
881
882 func (t *Torrent) readersChanged() {
883         t.updateReaderPieces()
884         t.updateAllPiecePriorities()
885 }
886
887 func (t *Torrent) updateReaderPieces() {
888         t.readerNowPieces, t.readerReadaheadPieces = t.readerPiecePriorities()
889 }
890
891 func (t *Torrent) readerPosChanged(from, to pieceRange) {
892         if from == to {
893                 return
894         }
895         t.updateReaderPieces()
896         // Order the ranges, high and low.
897         l, h := from, to
898         if l.begin > h.begin {
899                 l, h = h, l
900         }
901         if l.end < h.begin {
902                 // Two distinct ranges.
903                 t.updatePiecePriorities(l.begin, l.end)
904                 t.updatePiecePriorities(h.begin, h.end)
905         } else {
906                 // Ranges overlap.
907                 end := l.end
908                 if h.end > end {
909                         end = h.end
910                 }
911                 t.updatePiecePriorities(l.begin, end)
912         }
913 }
914
915 func (t *Torrent) maybeNewConns() {
916         // Tickle the accept routine.
917         t.cl.event.Broadcast()
918         t.openNewConns()
919 }
920
921 func (t *Torrent) piecePriorityChanged(piece int) {
922         // log.Printf("piece %d priority changed", piece)
923         for c := range t.conns {
924                 if c.updatePiecePriority(piece) {
925                         // log.Print("conn piece priority changed")
926                         c.updateRequests()
927                 }
928         }
929         t.maybeNewConns()
930         t.publishPieceChange(piece)
931 }
932
933 func (t *Torrent) updatePiecePriority(piece int) {
934         p := &t.pieces[piece]
935         newPrio := p.uncachedPriority()
936         // log.Printf("torrent %p: piece %d: uncached priority: %v", t, piece, newPrio)
937         if newPrio == PiecePriorityNone {
938                 if !t.pendingPieces.Remove(piece) {
939                         return
940                 }
941         } else {
942                 if !t.pendingPieces.Set(piece, newPrio.BitmapPriority()) {
943                         return
944                 }
945         }
946         t.piecePriorityChanged(piece)
947 }
948
949 func (t *Torrent) updateAllPiecePriorities() {
950         t.updatePiecePriorities(0, len(t.pieces))
951 }
952
953 // Update all piece priorities in one hit. This function should have the same
954 // output as updatePiecePriority, but across all pieces.
955 func (t *Torrent) updatePiecePriorities(begin, end int) {
956         for i := begin; i < end; i++ {
957                 t.updatePiecePriority(i)
958         }
959 }
960
961 // Returns the range of pieces [begin, end) that contains the extent of bytes.
962 func (t *Torrent) byteRegionPieces(off, size int64) (begin, end int) {
963         if off >= *t.length {
964                 return
965         }
966         if off < 0 {
967                 size += off
968                 off = 0
969         }
970         if size <= 0 {
971                 return
972         }
973         begin = int(off / t.info.PieceLength)
974         end = int((off + size + t.info.PieceLength - 1) / t.info.PieceLength)
975         if end > t.info.NumPieces() {
976                 end = t.info.NumPieces()
977         }
978         return
979 }
980
981 // Returns true if all iterations complete without breaking. Returns the read
982 // regions for all readers. The reader regions should not be merged as some
983 // callers depend on this method to enumerate readers.
984 func (t *Torrent) forReaderOffsetPieces(f func(begin, end int) (more bool)) (all bool) {
985         for r := range t.readers {
986                 p := r.pieces
987                 if p.begin >= p.end {
988                         continue
989                 }
990                 if !f(p.begin, p.end) {
991                         return false
992                 }
993         }
994         return true
995 }
996
997 func (t *Torrent) piecePriority(piece int) piecePriority {
998         prio, ok := t.pendingPieces.GetPriority(piece)
999         if !ok {
1000                 return PiecePriorityNone
1001         }
1002         return piecePriority(-prio)
1003 }
1004
1005 func (t *Torrent) pendRequest(req request) {
1006         ci := chunkIndex(req.chunkSpec, t.chunkSize)
1007         t.pieces[req.Index].pendChunkIndex(ci)
1008 }
1009
1010 func (t *Torrent) pieceCompletionChanged(piece int) {
1011         log.Call().Add("piece", piece).AddValue(debugLogValue).Log(t.logger)
1012         t.cl.event.Broadcast()
1013         if t.pieceComplete(piece) {
1014                 t.onPieceCompleted(piece)
1015         } else {
1016                 t.onIncompletePiece(piece)
1017         }
1018         t.updatePiecePriority(piece)
1019 }
1020
1021 func (t *Torrent) openNewConns() {
1022         t.cl.openNewConns(t)
1023 }
1024
1025 func (t *Torrent) getConnPieceInclination() []int {
1026         _ret := t.connPieceInclinationPool.Get()
1027         if _ret == nil {
1028                 pieceInclinationsNew.Add(1)
1029                 return rand.Perm(t.numPieces())
1030         }
1031         pieceInclinationsReused.Add(1)
1032         return *_ret.(*[]int)
1033 }
1034
1035 func (t *Torrent) putPieceInclination(pi []int) {
1036         t.connPieceInclinationPool.Put(&pi)
1037         pieceInclinationsPut.Add(1)
1038 }
1039
1040 func (t *Torrent) updatePieceCompletion(piece int) {
1041         pcu := t.pieceCompleteUncached(piece)
1042         p := &t.pieces[piece]
1043         changed := t.completedPieces.Get(piece) != pcu.Complete || p.storageCompletionOk != pcu.Ok
1044         log.Fmsg("piece %d completion: %v", piece, pcu.Ok).AddValue(debugLogValue).Log(t.logger)
1045         p.storageCompletionOk = pcu.Ok
1046         t.completedPieces.Set(piece, pcu.Complete)
1047         // log.Printf("piece %d uncached completion: %v", piece, pcu.Complete)
1048         // log.Printf("piece %d changed: %v", piece, changed)
1049         if changed {
1050                 t.pieceCompletionChanged(piece)
1051         }
1052 }
1053
1054 // Non-blocking read. Client lock is not required.
1055 func (t *Torrent) readAt(b []byte, off int64) (n int, err error) {
1056         p := &t.pieces[off/t.info.PieceLength]
1057         p.waitNoPendingWrites()
1058         return p.Storage().ReadAt(b, off-p.Info().Offset())
1059 }
1060
1061 func (t *Torrent) updateAllPieceCompletions() {
1062         for i := range iter.N(t.numPieces()) {
1063                 t.updatePieceCompletion(i)
1064         }
1065 }
1066
1067 // Returns an error if the metadata was completed, but couldn't be set for
1068 // some reason. Blame it on the last peer to contribute.
1069 func (t *Torrent) maybeCompleteMetadata() error {
1070         if t.haveInfo() {
1071                 // Nothing to do.
1072                 return nil
1073         }
1074         if !t.haveAllMetadataPieces() {
1075                 // Don't have enough metadata pieces.
1076                 return nil
1077         }
1078         err := t.setInfoBytes(t.metadataBytes)
1079         if err != nil {
1080                 t.invalidateMetadata()
1081                 return fmt.Errorf("error setting info bytes: %s", err)
1082         }
1083         if t.cl.config.Debug {
1084                 log.Printf("%s: got metadata from peers", t)
1085         }
1086         return nil
1087 }
1088
1089 func (t *Torrent) readerPieces() (ret bitmap.Bitmap) {
1090         t.forReaderOffsetPieces(func(begin, end int) bool {
1091                 ret.AddRange(begin, end)
1092                 return true
1093         })
1094         return
1095 }
1096
1097 func (t *Torrent) readerPiecePriorities() (now, readahead bitmap.Bitmap) {
1098         t.forReaderOffsetPieces(func(begin, end int) bool {
1099                 if end > begin {
1100                         now.Add(begin)
1101                         readahead.AddRange(begin+1, end)
1102                 }
1103                 return true
1104         })
1105         return
1106 }
1107
1108 func (t *Torrent) needData() bool {
1109         if t.closed.IsSet() {
1110                 return false
1111         }
1112         if !t.haveInfo() {
1113                 return true
1114         }
1115         return t.pendingPieces.Len() != 0
1116 }
1117
1118 func appendMissingStrings(old, new []string) (ret []string) {
1119         ret = old
1120 new:
1121         for _, n := range new {
1122                 for _, o := range old {
1123                         if o == n {
1124                                 continue new
1125                         }
1126                 }
1127                 ret = append(ret, n)
1128         }
1129         return
1130 }
1131
1132 func appendMissingTrackerTiers(existing [][]string, minNumTiers int) (ret [][]string) {
1133         ret = existing
1134         for minNumTiers > len(ret) {
1135                 ret = append(ret, nil)
1136         }
1137         return
1138 }
1139
1140 func (t *Torrent) addTrackers(announceList [][]string) {
1141         fullAnnounceList := &t.metainfo.AnnounceList
1142         t.metainfo.AnnounceList = appendMissingTrackerTiers(*fullAnnounceList, len(announceList))
1143         for tierIndex, trackerURLs := range announceList {
1144                 (*fullAnnounceList)[tierIndex] = appendMissingStrings((*fullAnnounceList)[tierIndex], trackerURLs)
1145         }
1146         t.startMissingTrackerScrapers()
1147         t.updateWantPeersEvent()
1148 }
1149
1150 // Don't call this before the info is available.
1151 func (t *Torrent) bytesCompleted() int64 {
1152         if !t.haveInfo() {
1153                 return 0
1154         }
1155         return t.info.TotalLength() - t.bytesLeft()
1156 }
1157
1158 func (t *Torrent) SetInfoBytes(b []byte) (err error) {
1159         t.cl.mu.Lock()
1160         defer t.cl.mu.Unlock()
1161         return t.setInfoBytes(b)
1162 }
1163
1164 // Returns true if connection is removed from torrent.Conns.
1165 func (t *Torrent) deleteConnection(c *connection) (ret bool) {
1166         _, ret = t.conns[c]
1167         delete(t.conns, c)
1168         return
1169 }
1170
1171 func (t *Torrent) dropConnection(c *connection) {
1172         t.cl.event.Broadcast()
1173         c.Close()
1174         if t.deleteConnection(c) {
1175                 t.openNewConns()
1176         }
1177 }
1178
1179 func (t *Torrent) wantPeers() bool {
1180         if t.closed.IsSet() {
1181                 return false
1182         }
1183         if len(t.peers) > t.cl.config.TorrentPeersLowWater {
1184                 return false
1185         }
1186         return t.needData() || t.seeding()
1187 }
1188
1189 func (t *Torrent) updateWantPeersEvent() {
1190         if t.wantPeers() {
1191                 t.wantPeersEvent.Set()
1192         } else {
1193                 t.wantPeersEvent.Clear()
1194         }
1195 }
1196
1197 // Returns whether the client should make effort to seed the torrent.
1198 func (t *Torrent) seeding() bool {
1199         cl := t.cl
1200         if t.closed.IsSet() {
1201                 return false
1202         }
1203         if cl.config.NoUpload {
1204                 return false
1205         }
1206         if !cl.config.Seed {
1207                 return false
1208         }
1209         if cl.config.DisableAggressiveUpload && t.needData() {
1210                 return false
1211         }
1212         return true
1213 }
1214
1215 func (t *Torrent) startScrapingTracker(url string) {
1216         if url == "" {
1217                 return
1218         }
1219         if _, ok := t.trackerAnnouncers[url]; ok {
1220                 return
1221         }
1222         newAnnouncer := &trackerScraper{
1223                 url: url,
1224                 t:   t,
1225         }
1226         if t.trackerAnnouncers == nil {
1227                 t.trackerAnnouncers = make(map[string]*trackerScraper)
1228         }
1229         t.trackerAnnouncers[url] = newAnnouncer
1230         go newAnnouncer.Run()
1231 }
1232
1233 // Adds and starts tracker scrapers for tracker URLs that aren't already
1234 // running.
1235 func (t *Torrent) startMissingTrackerScrapers() {
1236         if t.cl.config.DisableTrackers {
1237                 return
1238         }
1239         t.startScrapingTracker(t.metainfo.Announce)
1240         for _, tier := range t.metainfo.AnnounceList {
1241                 for _, url := range tier {
1242                         t.startScrapingTracker(url)
1243                 }
1244         }
1245 }
1246
1247 // Returns an AnnounceRequest with fields filled out to defaults and current
1248 // values.
1249 func (t *Torrent) announceRequest() tracker.AnnounceRequest {
1250         return tracker.AnnounceRequest{
1251                 Event:    tracker.None,
1252                 NumWant:  -1,
1253                 Port:     uint16(t.cl.incomingPeerPort()),
1254                 PeerId:   t.cl.peerID,
1255                 InfoHash: t.infoHash,
1256                 Left:     t.bytesLeftAnnounce(),
1257         }
1258 }
1259
1260 // Adds peers revealed in an announce until the announce ends, or we have
1261 // enough peers.
1262 func (t *Torrent) consumeDHTAnnounce(pvs <-chan dht.PeersValues) {
1263         cl := t.cl
1264         // Count all the unique addresses we got during this announce.
1265         allAddrs := make(map[string]struct{})
1266         for {
1267                 select {
1268                 case v, ok := <-pvs:
1269                         if !ok {
1270                                 return
1271                         }
1272                         addPeers := make([]Peer, 0, len(v.Peers))
1273                         for _, cp := range v.Peers {
1274                                 if cp.Port == 0 {
1275                                         // Can't do anything with this.
1276                                         continue
1277                                 }
1278                                 addPeers = append(addPeers, Peer{
1279                                         IP:     cp.IP[:],
1280                                         Port:   cp.Port,
1281                                         Source: peerSourceDHTGetPeers,
1282                                 })
1283                                 key := (&net.UDPAddr{
1284                                         IP:   cp.IP[:],
1285                                         Port: cp.Port,
1286                                 }).String()
1287                                 allAddrs[key] = struct{}{}
1288                         }
1289                         cl.mu.Lock()
1290                         t.addPeers(addPeers)
1291                         numPeers := len(t.peers)
1292                         cl.mu.Unlock()
1293                         if numPeers >= cl.config.TorrentPeersHighWater {
1294                                 return
1295                         }
1296                 case <-t.closed.LockedChan(&cl.mu):
1297                         return
1298                 }
1299         }
1300 }
1301
1302 func (t *Torrent) announceDHT(impliedPort bool) (err error) {
1303         cl := t.cl
1304         ps, err := cl.dHT.Announce(t.infoHash, cl.incomingPeerPort(), impliedPort)
1305         if err != nil {
1306                 return
1307         }
1308         t.consumeDHTAnnounce(ps.Peers)
1309         ps.Close()
1310         return
1311 }
1312
1313 func (t *Torrent) dhtAnnouncer() {
1314         cl := t.cl
1315         for {
1316                 select {
1317                 case <-t.wantPeersEvent.LockedChan(&cl.mu):
1318                 case <-t.closed.LockedChan(&cl.mu):
1319                         return
1320                 }
1321                 err := t.announceDHT(true)
1322                 func() {
1323                         cl.mu.Lock()
1324                         defer cl.mu.Unlock()
1325                         if err == nil {
1326                                 t.numDHTAnnounces++
1327                         } else {
1328                                 log.Printf("error announcing %q to DHT: %s", t, err)
1329                         }
1330                 }()
1331                 select {
1332                 case <-t.closed.LockedChan(&cl.mu):
1333                         return
1334                 case <-time.After(5 * time.Minute):
1335                 }
1336         }
1337 }
1338
1339 func (t *Torrent) addPeers(peers []Peer) {
1340         for _, p := range peers {
1341                 if t.cl.badPeerIPPort(p.IP, p.Port) {
1342                         continue
1343                 }
1344                 t.addPeer(p)
1345         }
1346 }
1347
1348 func (t *Torrent) Stats() TorrentStats {
1349         t.cl.mu.Lock()
1350         defer t.cl.mu.Unlock()
1351         return t.statsLocked()
1352 }
1353
1354 func (t *Torrent) statsLocked() TorrentStats {
1355         t.stats.ActivePeers = len(t.conns)
1356         t.stats.HalfOpenPeers = len(t.halfOpen)
1357         t.stats.PendingPeers = len(t.peers)
1358         t.stats.TotalPeers = t.numTotalPeers()
1359         t.stats.ConnectedSeeders = 0
1360         for c := range t.conns {
1361                 if all, ok := c.peerHasAllPieces(); all && ok {
1362                         t.stats.ConnectedSeeders++
1363                 }
1364         }
1365         return t.stats
1366 }
1367
1368 // The total number of peers in the torrent.
1369 func (t *Torrent) numTotalPeers() int {
1370         peers := make(map[string]struct{})
1371         for conn := range t.conns {
1372                 ra := conn.conn.RemoteAddr()
1373                 if ra == nil {
1374                         // It's been closed and doesn't support RemoteAddr.
1375                         continue
1376                 }
1377                 peers[ra.String()] = struct{}{}
1378         }
1379         for addr := range t.halfOpen {
1380                 peers[addr] = struct{}{}
1381         }
1382         for _, peer := range t.peers {
1383                 peers[fmt.Sprintf("%s:%d", peer.IP, peer.Port)] = struct{}{}
1384         }
1385         return len(peers)
1386 }
1387
1388 // Returns true if the connection is added.
1389 func (t *Torrent) addConnection(c *connection, outgoing bool) bool {
1390         if t.cl.closed.IsSet() {
1391                 return false
1392         }
1393         if !t.wantConns() {
1394                 return false
1395         }
1396         for c0 := range t.conns {
1397                 if c.PeerID == c0.PeerID {
1398                         // Already connected to a client with that ID.
1399                         duplicateClientConns.Add(1)
1400                         lower := string(t.cl.peerID[:]) < string(c.PeerID[:])
1401                         // Retain the connection from initiated from lower peer ID to
1402                         // higher.
1403                         if outgoing == lower {
1404                                 // Close the other one.
1405                                 c0.Close()
1406                                 // TODO: Is it safe to delete from the map while we're
1407                                 // iterating over it?
1408                                 t.deleteConnection(c0)
1409                         } else {
1410                                 // Abandon this one.
1411                                 return false
1412                         }
1413                 }
1414         }
1415         if len(t.conns) >= t.maxEstablishedConns {
1416                 c := t.worstBadConn()
1417                 if c == nil {
1418                         return false
1419                 }
1420                 if t.cl.config.Debug && missinggo.CryHeard() {
1421                         log.Printf("%s: dropping connection to make room for new one:\n    %s", t, c)
1422                 }
1423                 c.Close()
1424                 t.deleteConnection(c)
1425         }
1426         if len(t.conns) >= t.maxEstablishedConns {
1427                 panic(len(t.conns))
1428         }
1429         if c.t != nil {
1430                 panic("connection already associated with a torrent")
1431         }
1432         // Reconcile bytes transferred before connection was associated with a
1433         // torrent.
1434         t.stats.wroteBytes(c.stats.BytesWritten)
1435         t.stats.readBytes(c.stats.BytesRead)
1436         c.t = t
1437         t.conns[c] = struct{}{}
1438         return true
1439 }
1440
1441 func (t *Torrent) wantConns() bool {
1442         if !t.networkingEnabled {
1443                 return false
1444         }
1445         if t.closed.IsSet() {
1446                 return false
1447         }
1448         if !t.seeding() && !t.needData() {
1449                 return false
1450         }
1451         if len(t.conns) < t.maxEstablishedConns {
1452                 return true
1453         }
1454         return t.worstBadConn() != nil
1455 }
1456
1457 func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
1458         t.cl.mu.Lock()
1459         defer t.cl.mu.Unlock()
1460         oldMax = t.maxEstablishedConns
1461         t.maxEstablishedConns = max
1462         wcs := slices.HeapInterface(slices.FromMapKeys(t.conns), worseConn)
1463         for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 {
1464                 t.dropConnection(wcs.Pop().(*connection))
1465         }
1466         t.openNewConns()
1467         return oldMax
1468 }
1469
1470 func (t *Torrent) mu() missinggo.RWLocker {
1471         return &t.cl.mu
1472 }
1473
1474 func (t *Torrent) pieceHashed(piece int, correct bool) {
1475         log.Fmsg("hashed piece %d", piece).Add("piece", piece).Add("passed", correct).AddValue(debugLogValue).Log(t.logger)
1476         if t.closed.IsSet() {
1477                 return
1478         }
1479         p := &t.pieces[piece]
1480         touchers := t.reapPieceTouchers(piece)
1481         if p.everHashed {
1482                 // Don't score the first time a piece is hashed, it could be an
1483                 // initial check.
1484                 if correct {
1485                         pieceHashedCorrect.Add(1)
1486                 } else {
1487                         log.Printf("%s: piece %d (%s) failed hash: %d connections contributed", t, piece, p.hash, len(touchers))
1488                         pieceHashedNotCorrect.Add(1)
1489                 }
1490         }
1491         p.everHashed = true
1492         if correct {
1493                 for _, c := range touchers {
1494                         c.goodPiecesDirtied++
1495                 }
1496                 err := p.Storage().MarkComplete()
1497                 if err != nil {
1498                         log.Printf("%T: error marking piece complete %d: %s", t.storage, piece, err)
1499                 }
1500         } else {
1501                 if len(touchers) != 0 {
1502                         for _, c := range touchers {
1503                                 // Y u do dis peer?!
1504                                 c.badPiecesDirtied++
1505                         }
1506                         slices.Sort(touchers, connLessTrusted)
1507                         if t.cl.config.Debug {
1508                                 log.Printf("dropping first corresponding conn from trust: %v", func() (ret []int) {
1509                                         for _, c := range touchers {
1510                                                 ret = append(ret, c.netGoodPiecesDirtied())
1511                                         }
1512                                         return
1513                                 }())
1514                         }
1515                         c := touchers[0]
1516                         t.cl.banPeerIP(missinggo.AddrIP(c.remoteAddr()))
1517                         c.Drop()
1518                 }
1519                 t.onIncompletePiece(piece)
1520                 p.Storage().MarkNotComplete()
1521         }
1522         t.updatePieceCompletion(piece)
1523 }
1524
1525 func (t *Torrent) cancelRequestsForPiece(piece int) {
1526         // TODO: Make faster
1527         for cn := range t.conns {
1528                 cn.tickleWriter()
1529         }
1530 }
1531
1532 func (t *Torrent) onPieceCompleted(piece int) {
1533         t.pendAllChunkSpecs(piece)
1534         t.cancelRequestsForPiece(piece)
1535         for conn := range t.conns {
1536                 conn.Have(piece)
1537         }
1538 }
1539
1540 // Called when a piece is found to be not complete.
1541 func (t *Torrent) onIncompletePiece(piece int) {
1542         if t.pieceAllDirty(piece) {
1543                 t.pendAllChunkSpecs(piece)
1544         }
1545         if !t.wantPieceIndex(piece) {
1546                 // log.Printf("piece %d incomplete and unwanted", piece)
1547                 return
1548         }
1549         // We could drop any connections that we told we have a piece that we
1550         // don't here. But there's a test failure, and it seems clients don't care
1551         // if you request pieces that you already claim to have. Pruning bad
1552         // connections might just remove any connections that aren't treating us
1553         // favourably anyway.
1554
1555         // for c := range t.conns {
1556         //      if c.sentHave(piece) {
1557         //              c.Drop()
1558         //      }
1559         // }
1560         for conn := range t.conns {
1561                 if conn.PeerHasPiece(piece) {
1562                         conn.updateRequests()
1563                 }
1564         }
1565 }
1566
1567 func (t *Torrent) verifyPiece(piece int) {
1568         cl := t.cl
1569         cl.mu.Lock()
1570         defer cl.mu.Unlock()
1571         p := &t.pieces[piece]
1572         defer func() {
1573                 p.numVerifies++
1574                 cl.event.Broadcast()
1575         }()
1576         for p.hashing || t.storage == nil {
1577                 cl.event.Wait()
1578         }
1579         if !p.t.piecesQueuedForHash.Remove(piece) {
1580                 panic("piece was not queued")
1581         }
1582         if t.closed.IsSet() || t.pieceComplete(piece) {
1583                 t.updatePiecePriority(piece)
1584                 return
1585         }
1586         p.hashing = true
1587         t.publishPieceChange(piece)
1588         t.storageLock.RLock()
1589         cl.mu.Unlock()
1590         sum := t.hashPiece(piece)
1591         t.storageLock.RUnlock()
1592         cl.mu.Lock()
1593         p.hashing = false
1594         t.pieceHashed(piece, sum == p.hash)
1595         t.publishPieceChange(piece)
1596 }
1597
1598 // Return the connections that touched a piece, and clear the entry while
1599 // doing it.
1600 func (t *Torrent) reapPieceTouchers(piece int) (ret []*connection) {
1601         for c := range t.conns {
1602                 if _, ok := c.peerTouchedPieces[piece]; ok {
1603                         ret = append(ret, c)
1604                         delete(c.peerTouchedPieces, piece)
1605                 }
1606         }
1607         return
1608 }
1609
1610 func (t *Torrent) connsAsSlice() (ret []*connection) {
1611         for c := range t.conns {
1612                 ret = append(ret, c)
1613         }
1614         return
1615 }
1616
1617 // Currently doesn't really queue, but should in the future.
1618 func (t *Torrent) queuePieceCheck(pieceIndex int) {
1619         piece := &t.pieces[pieceIndex]
1620         if piece.queuedForHash() {
1621                 return
1622         }
1623         t.piecesQueuedForHash.Add(pieceIndex)
1624         t.publishPieceChange(pieceIndex)
1625         go t.verifyPiece(pieceIndex)
1626 }
1627
1628 func (t *Torrent) VerifyData() {
1629         for i := range iter.N(t.NumPieces()) {
1630                 t.Piece(i).VerifyData()
1631         }
1632 }