]> Sergey Matveev's repositories - btrtrc.git/blob - torrent.go
IPv6 tracker support
[btrtrc.git] / torrent.go
1 package torrent
2
3 import (
4         "container/heap"
5         "crypto/sha1"
6         "errors"
7         "fmt"
8         "io"
9         "math"
10         "math/rand"
11         "net"
12         "net/url"
13         "os"
14         "strconv"
15         "sync"
16         "text/tabwriter"
17         "time"
18
19         "github.com/anacrolix/dht"
20         "github.com/anacrolix/dht/krpc"
21         "github.com/anacrolix/log"
22         "github.com/anacrolix/missinggo"
23         "github.com/anacrolix/missinggo/bitmap"
24         "github.com/anacrolix/missinggo/perf"
25         "github.com/anacrolix/missinggo/prioritybitmap"
26         "github.com/anacrolix/missinggo/pubsub"
27         "github.com/anacrolix/missinggo/slices"
28         "github.com/bradfitz/iter"
29         "github.com/davecgh/go-spew/spew"
30
31         "github.com/anacrolix/torrent/bencode"
32         "github.com/anacrolix/torrent/metainfo"
33         pp "github.com/anacrolix/torrent/peer_protocol"
34         "github.com/anacrolix/torrent/storage"
35         "github.com/anacrolix/torrent/tracker"
36 )
37
38 func (t *Torrent) chunkIndexSpec(chunkIndex, piece int) chunkSpec {
39         return chunkIndexSpec(chunkIndex, t.pieceLength(piece), t.chunkSize)
40 }
41
42 type peersKey struct {
43         IPBytes string
44         Port    int
45 }
46
47 // Maintains state of torrent within a Client.
48 type Torrent struct {
49         cl     *Client
50         logger *log.Logger
51
52         networkingEnabled bool
53         // Determines what chunks to request from peers. 1: Favour higher priority
54         // pieces with some fuzzing to reduce overlaps and wastage across
55         // connections. 2: The fastest connection downloads strictly in order of
56         // priority, while all others adher to their piece inclications.
57         requestStrategy int
58
59         closed   missinggo.Event
60         infoHash metainfo.Hash
61         pieces   []Piece
62         // Values are the piece indices that changed.
63         pieceStateChanges *pubsub.PubSub
64         // The size of chunks to request from peers over the wire. This is
65         // normally 16KiB by convention these days.
66         chunkSize pp.Integer
67         chunkPool *sync.Pool
68         // Total length of the torrent in bytes. Stored because it's not O(1) to
69         // get this from the info dict.
70         length *int64
71
72         // The storage to open when the info dict becomes available.
73         storageOpener *storage.Client
74         // Storage for torrent data.
75         storage *storage.Torrent
76         // Read-locked for using storage, and write-locked for Closing.
77         storageLock sync.RWMutex
78
79         // TODO: Only announce stuff is used?
80         metainfo metainfo.MetaInfo
81
82         // The info dict. nil if we don't have it (yet).
83         info  *metainfo.Info
84         files *[]*File
85
86         // Active peer connections, running message stream loops. TODO: Make this
87         // open (not-closed) connections only.
88         conns               map[*connection]struct{}
89         maxEstablishedConns int
90         // Set of addrs to which we're attempting to connect. Connections are
91         // half-open until all handshakes are completed.
92         halfOpen    map[string]Peer
93         fastestConn *connection
94
95         // Reserve of peers to connect to. A peer can be both here and in the
96         // active connections if were told about the peer after connecting with
97         // them. That encourages us to reconnect to peers that are well known in
98         // the swarm.
99         peers          map[peersKey]Peer
100         wantPeersEvent missinggo.Event
101         // An announcer for each tracker URL.
102         trackerAnnouncers map[string]*trackerScraper
103         // How many times we've initiated a DHT announce. TODO: Move into stats.
104         numDHTAnnounces int
105
106         // Name used if the info name isn't available. Should be cleared when the
107         // Info does become available.
108         displayName string
109
110         // The bencoded bytes of the info dict. This is actively manipulated if
111         // the info bytes aren't initially available, and we try to fetch them
112         // from peers.
113         metadataBytes []byte
114         // Each element corresponds to the 16KiB metadata pieces. If true, we have
115         // received that piece.
116         metadataCompletedChunks []bool
117         metadataChanged         sync.Cond
118
119         // Set when .Info is obtained.
120         gotMetainfo missinggo.Event
121
122         readers               map[*reader]struct{}
123         readerNowPieces       bitmap.Bitmap
124         readerReadaheadPieces bitmap.Bitmap
125
126         // A cache of pieces we need to get. Calculated from various piece and
127         // file priorities and completion states elsewhere.
128         pendingPieces prioritybitmap.PriorityBitmap
129         // A cache of completed piece indices.
130         completedPieces bitmap.Bitmap
131         // Pieces that need to be hashed.
132         piecesQueuedForHash bitmap.Bitmap
133
134         // A pool of piece priorities []int for assignment to new connections.
135         // These "inclinations" are used to give connections preference for
136         // different pieces.
137         connPieceInclinationPool sync.Pool
138         // Torrent-level statistics.
139         stats TorrentStats
140
141         // Count of each request across active connections.
142         pendingRequests map[request]int
143 }
144
145 func (t *Torrent) tickleReaders() {
146         t.cl.event.Broadcast()
147 }
148
149 // Returns a channel that is closed when the Torrent is closed.
150 func (t *Torrent) Closed() <-chan struct{} {
151         return t.closed.LockedChan(&t.cl.mu)
152 }
153
154 // KnownSwarm returns the known subset of the peers in the Torrent's swarm, including active,
155 // pending, and half-open peers.
156 func (t *Torrent) KnownSwarm() (ks []Peer) {
157         // Add pending peers to the list
158         for _, peer := range t.peers {
159                 ks = append(ks, peer)
160         }
161
162         // Add half-open peers to the list
163         for _, peer := range t.halfOpen {
164                 ks = append(ks, peer)
165         }
166
167         // Add active peers to the list
168         for conn := range t.conns {
169                 host, portString, err := net.SplitHostPort(conn.remoteAddr().String())
170                 if err != nil {
171                         panic(err)
172                 }
173
174                 ip := net.ParseIP(host)
175                 port, err := strconv.Atoi(portString)
176                 if err != nil {
177                         panic(err)
178                 }
179
180                 ks = append(ks, Peer{
181                         Id:     conn.PeerID,
182                         IP:     ip,
183                         Port:   port,
184                         Source: conn.Discovery,
185                         // > If the connection is encrypted, that's certainly enough to set SupportsEncryption.
186                         // > But if we're not connected to them with an encrypted connection, I couldn't say
187                         // > what's appropriate. We can carry forward the SupportsEncryption value as we
188                         // > received it from trackers/DHT/PEX, or just use the encryption state for the
189                         // > connection. It's probably easiest to do the latter for now.
190                         // https://github.com/anacrolix/torrent/pull/188
191                         SupportsEncryption: conn.headerEncrypted,
192                 })
193         }
194
195         return
196 }
197
198 func (t *Torrent) setChunkSize(size pp.Integer) {
199         t.chunkSize = size
200         t.chunkPool = &sync.Pool{
201                 New: func() interface{} {
202                         b := make([]byte, size)
203                         return &b
204                 },
205         }
206 }
207
208 func (t *Torrent) setDisplayName(dn string) {
209         if t.haveInfo() {
210                 return
211         }
212         t.displayName = dn
213 }
214
215 func (t *Torrent) pieceComplete(piece int) bool {
216         return t.completedPieces.Get(piece)
217 }
218
219 func (t *Torrent) pieceCompleteUncached(piece int) storage.Completion {
220         return t.pieces[piece].Storage().Completion()
221 }
222
223 // There's a connection to that address already.
224 func (t *Torrent) addrActive(addr string) bool {
225         if _, ok := t.halfOpen[addr]; ok {
226                 return true
227         }
228         for c := range t.conns {
229                 ra := c.remoteAddr()
230                 if ra == nil {
231                         continue
232                 }
233                 if ra.String() == addr {
234                         return true
235                 }
236         }
237         return false
238 }
239
240 func (t *Torrent) unclosedConnsAsSlice() (ret []*connection) {
241         ret = make([]*connection, 0, len(t.conns))
242         for c := range t.conns {
243                 if !c.closed.IsSet() {
244                         ret = append(ret, c)
245                 }
246         }
247         return
248 }
249
250 func (t *Torrent) addPeer(p Peer) {
251         cl := t.cl
252         peersAddedBySource.Add(string(p.Source), 1)
253         if cl.badPeerIPPort(p.IP, p.Port) {
254                 torrent.Add("peers not added because of bad addr", 1)
255                 return
256         }
257         t.openNewConns()
258         if len(t.peers) >= cl.config.TorrentPeersHighWater {
259                 return
260         }
261         t.peers[peersKey{string(p.IP), p.Port}] = p
262         t.openNewConns()
263
264 }
265
266 func (t *Torrent) invalidateMetadata() {
267         for i := range t.metadataCompletedChunks {
268                 t.metadataCompletedChunks[i] = false
269         }
270         t.info = nil
271 }
272
273 func (t *Torrent) saveMetadataPiece(index int, data []byte) {
274         if t.haveInfo() {
275                 return
276         }
277         if index >= len(t.metadataCompletedChunks) {
278                 log.Printf("%s: ignoring metadata piece %d", t, index)
279                 return
280         }
281         copy(t.metadataBytes[(1<<14)*index:], data)
282         t.metadataCompletedChunks[index] = true
283 }
284
285 func (t *Torrent) metadataPieceCount() int {
286         return (len(t.metadataBytes) + (1 << 14) - 1) / (1 << 14)
287 }
288
289 func (t *Torrent) haveMetadataPiece(piece int) bool {
290         if t.haveInfo() {
291                 return (1<<14)*piece < len(t.metadataBytes)
292         } else {
293                 return piece < len(t.metadataCompletedChunks) && t.metadataCompletedChunks[piece]
294         }
295 }
296
297 func (t *Torrent) metadataSizeKnown() bool {
298         return t.metadataBytes != nil
299 }
300
301 func (t *Torrent) metadataSize() int {
302         return len(t.metadataBytes)
303 }
304
305 func infoPieceHashes(info *metainfo.Info) (ret []string) {
306         for i := 0; i < len(info.Pieces); i += sha1.Size {
307                 ret = append(ret, string(info.Pieces[i:i+sha1.Size]))
308         }
309         return
310 }
311
312 func (t *Torrent) makePieces() {
313         hashes := infoPieceHashes(t.info)
314         t.pieces = make([]Piece, len(hashes))
315         for i, hash := range hashes {
316                 piece := &t.pieces[i]
317                 piece.t = t
318                 piece.index = i
319                 piece.noPendingWrites.L = &piece.pendingWritesMutex
320                 missinggo.CopyExact(piece.hash[:], hash)
321                 files := *t.files
322                 beginFile := pieceFirstFileIndex(piece.torrentBeginOffset(), files)
323                 endFile := pieceEndFileIndex(piece.torrentEndOffset(), files)
324                 piece.files = files[beginFile:endFile]
325         }
326 }
327
328 // Returns the index of the first file containing the piece. files must be
329 // ordered by offset.
330 func pieceFirstFileIndex(pieceOffset int64, files []*File) int {
331         for i, f := range files {
332                 if f.offset+f.length > pieceOffset {
333                         return i
334                 }
335         }
336         return 0
337 }
338
339 // Returns the index after the last file containing the piece. files must be
340 // ordered by offset.
341 func pieceEndFileIndex(pieceEndOffset int64, files []*File) int {
342         for i, f := range files {
343                 if f.offset+f.length >= pieceEndOffset {
344                         return i + 1
345                 }
346         }
347         return 0
348 }
349
350 func (t *Torrent) cacheLength() {
351         var l int64
352         for _, f := range t.info.UpvertedFiles() {
353                 l += f.Length
354         }
355         t.length = &l
356 }
357
358 func (t *Torrent) setInfo(info *metainfo.Info) error {
359         if err := validateInfo(info); err != nil {
360                 return fmt.Errorf("bad info: %s", err)
361         }
362         if t.storageOpener != nil {
363                 var err error
364                 t.storage, err = t.storageOpener.OpenTorrent(info, t.infoHash)
365                 if err != nil {
366                         return fmt.Errorf("error opening torrent storage: %s", err)
367                 }
368         }
369         t.info = info
370         t.displayName = "" // Save a few bytes lol.
371         t.initFiles()
372         t.cacheLength()
373         t.makePieces()
374         return nil
375 }
376
377 func (t *Torrent) onSetInfo() {
378         for conn := range t.conns {
379                 if err := conn.setNumPieces(t.numPieces()); err != nil {
380                         log.Printf("closing connection: %s", err)
381                         conn.Close()
382                 }
383         }
384         for i := range t.pieces {
385                 t.updatePieceCompletion(i)
386                 p := &t.pieces[i]
387                 if !p.storageCompletionOk {
388                         // log.Printf("piece %s completion unknown, queueing check", p)
389                         t.queuePieceCheck(i)
390                 }
391         }
392         t.cl.event.Broadcast()
393         t.gotMetainfo.Set()
394         t.updateWantPeersEvent()
395         t.pendingRequests = make(map[request]int)
396 }
397
398 // Called when metadata for a torrent becomes available.
399 func (t *Torrent) setInfoBytes(b []byte) error {
400         if metainfo.HashBytes(b) != t.infoHash {
401                 return errors.New("info bytes have wrong hash")
402         }
403         var info metainfo.Info
404         if err := bencode.Unmarshal(b, &info); err != nil {
405                 return fmt.Errorf("error unmarshalling info bytes: %s", err)
406         }
407         if err := t.setInfo(&info); err != nil {
408                 return err
409         }
410         t.metadataBytes = b
411         t.metadataCompletedChunks = nil
412         t.onSetInfo()
413         return nil
414 }
415
416 func (t *Torrent) haveAllMetadataPieces() bool {
417         if t.haveInfo() {
418                 return true
419         }
420         if t.metadataCompletedChunks == nil {
421                 return false
422         }
423         for _, have := range t.metadataCompletedChunks {
424                 if !have {
425                         return false
426                 }
427         }
428         return true
429 }
430
431 // TODO: Propagate errors to disconnect peer.
432 func (t *Torrent) setMetadataSize(bytes int64) (err error) {
433         if t.haveInfo() {
434                 // We already know the correct metadata size.
435                 return
436         }
437         if bytes <= 0 || bytes > 10000000 { // 10MB, pulled from my ass.
438                 return errors.New("bad size")
439         }
440         if t.metadataBytes != nil && len(t.metadataBytes) == int(bytes) {
441                 return
442         }
443         t.metadataBytes = make([]byte, bytes)
444         t.metadataCompletedChunks = make([]bool, (bytes+(1<<14)-1)/(1<<14))
445         t.metadataChanged.Broadcast()
446         for c := range t.conns {
447                 c.requestPendingMetadata()
448         }
449         return
450 }
451
452 // The current working name for the torrent. Either the name in the info dict,
453 // or a display name given such as by the dn value in a magnet link, or "".
454 func (t *Torrent) name() string {
455         if t.haveInfo() {
456                 return t.info.Name
457         }
458         return t.displayName
459 }
460
461 func (t *Torrent) pieceState(index int) (ret PieceState) {
462         p := &t.pieces[index]
463         ret.Priority = t.piecePriority(index)
464         ret.Completion = p.completion()
465         if p.queuedForHash() || p.hashing {
466                 ret.Checking = true
467         }
468         if !ret.Complete && t.piecePartiallyDownloaded(index) {
469                 ret.Partial = true
470         }
471         return
472 }
473
474 func (t *Torrent) metadataPieceSize(piece int) int {
475         return metadataPieceSize(len(t.metadataBytes), piece)
476 }
477
478 func (t *Torrent) newMetadataExtensionMessage(c *connection, msgType int, piece int, data []byte) pp.Message {
479         d := map[string]int{
480                 "msg_type": msgType,
481                 "piece":    piece,
482         }
483         if data != nil {
484                 d["total_size"] = len(t.metadataBytes)
485         }
486         p, err := bencode.Marshal(d)
487         if err != nil {
488                 panic(err)
489         }
490         return pp.Message{
491                 Type:            pp.Extended,
492                 ExtendedID:      c.PeerExtensionIDs["ut_metadata"],
493                 ExtendedPayload: append(p, data...),
494         }
495 }
496
497 func (t *Torrent) pieceStateRuns() (ret []PieceStateRun) {
498         rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
499                 ret = append(ret, PieceStateRun{
500                         PieceState: el.(PieceState),
501                         Length:     int(count),
502                 })
503         })
504         for index := range t.pieces {
505                 rle.Append(t.pieceState(index), 1)
506         }
507         rle.Flush()
508         return
509 }
510
511 // Produces a small string representing a PieceStateRun.
512 func pieceStateRunStatusChars(psr PieceStateRun) (ret string) {
513         ret = fmt.Sprintf("%d", psr.Length)
514         ret += func() string {
515                 switch psr.Priority {
516                 case PiecePriorityNext:
517                         return "N"
518                 case PiecePriorityNormal:
519                         return "."
520                 case PiecePriorityReadahead:
521                         return "R"
522                 case PiecePriorityNow:
523                         return "!"
524                 case PiecePriorityHigh:
525                         return "H"
526                 default:
527                         return ""
528                 }
529         }()
530         if psr.Checking {
531                 ret += "H"
532         }
533         if psr.Partial {
534                 ret += "P"
535         }
536         if psr.Complete {
537                 ret += "C"
538         }
539         if !psr.Ok {
540                 ret += "?"
541         }
542         return
543 }
544
545 func (t *Torrent) writeStatus(w io.Writer) {
546         fmt.Fprintf(w, "Infohash: %s\n", t.infoHash.HexString())
547         fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
548         if !t.haveInfo() {
549                 fmt.Fprintf(w, "Metadata have: ")
550                 for _, h := range t.metadataCompletedChunks {
551                         fmt.Fprintf(w, "%c", func() rune {
552                                 if h {
553                                         return 'H'
554                                 } else {
555                                         return '.'
556                                 }
557                         }())
558                 }
559                 fmt.Fprintln(w)
560         }
561         fmt.Fprintf(w, "Piece length: %s\n", func() string {
562                 if t.haveInfo() {
563                         return fmt.Sprint(t.usualPieceSize())
564                 } else {
565                         return "?"
566                 }
567         }())
568         if t.info != nil {
569                 fmt.Fprintf(w, "Num Pieces: %d (%d completed)\n", t.numPieces(), t.numPiecesCompleted())
570                 fmt.Fprint(w, "Piece States:")
571                 for _, psr := range t.pieceStateRuns() {
572                         w.Write([]byte(" "))
573                         w.Write([]byte(pieceStateRunStatusChars(psr)))
574                 }
575                 fmt.Fprintln(w)
576         }
577         fmt.Fprintf(w, "Reader Pieces:")
578         t.forReaderOffsetPieces(func(begin, end int) (again bool) {
579                 fmt.Fprintf(w, " %d:%d", begin, end)
580                 return true
581         })
582         fmt.Fprintln(w)
583
584         fmt.Fprintf(w, "Enabled trackers:\n")
585         func() {
586                 tw := tabwriter.NewWriter(w, 0, 0, 2, ' ', 0)
587                 fmt.Fprintf(tw, "    URL\tNext announce\tLast announce\n")
588                 for _, ta := range slices.Sort(slices.FromMapElems(t.trackerAnnouncers), func(l, r *trackerScraper) bool {
589                         return l.u.String() < r.u.String()
590                 }).([]*trackerScraper) {
591                         fmt.Fprintf(tw, "    %s\n", ta.statusLine())
592                 }
593                 tw.Flush()
594         }()
595
596         fmt.Fprintf(w, "DHT Announces: %d\n", t.numDHTAnnounces)
597
598         spew.NewDefaultConfig()
599         spew.Fdump(w, t.statsLocked())
600
601         conns := t.connsAsSlice()
602         slices.Sort(conns, worseConn)
603         for i, c := range conns {
604                 fmt.Fprintf(w, "%2d. ", i+1)
605                 c.WriteStatus(w, t)
606         }
607 }
608
609 func (t *Torrent) haveInfo() bool {
610         return t.info != nil
611 }
612
613 // Returns a run-time generated MetaInfo that includes the info bytes and
614 // announce-list as currently known to the client.
615 func (t *Torrent) newMetaInfo() metainfo.MetaInfo {
616         return metainfo.MetaInfo{
617                 CreationDate: time.Now().Unix(),
618                 Comment:      "dynamic metainfo from client",
619                 CreatedBy:    "go.torrent",
620                 AnnounceList: t.metainfo.UpvertedAnnounceList(),
621                 InfoBytes: func() []byte {
622                         if t.haveInfo() {
623                                 return t.metadataBytes
624                         } else {
625                                 return nil
626                         }
627                 }(),
628         }
629 }
630
631 func (t *Torrent) BytesMissing() int64 {
632         t.mu().RLock()
633         defer t.mu().RUnlock()
634         return t.bytesMissingLocked()
635 }
636
637 func (t *Torrent) bytesMissingLocked() int64 {
638         return t.bytesLeft()
639 }
640
641 func (t *Torrent) bytesLeft() (left int64) {
642         bitmap.Flip(t.completedPieces, 0, t.numPieces()).IterTyped(func(piece int) bool {
643                 p := &t.pieces[piece]
644                 left += int64(p.length() - p.numDirtyBytes())
645                 return true
646         })
647         return
648 }
649
650 // Bytes left to give in tracker announces.
651 func (t *Torrent) bytesLeftAnnounce() uint64 {
652         if t.haveInfo() {
653                 return uint64(t.bytesLeft())
654         } else {
655                 return math.MaxUint64
656         }
657 }
658
659 func (t *Torrent) piecePartiallyDownloaded(piece int) bool {
660         if t.pieceComplete(piece) {
661                 return false
662         }
663         if t.pieceAllDirty(piece) {
664                 return false
665         }
666         return t.pieces[piece].hasDirtyChunks()
667 }
668
669 func (t *Torrent) usualPieceSize() int {
670         return int(t.info.PieceLength)
671 }
672
673 func (t *Torrent) numPieces() int {
674         return t.info.NumPieces()
675 }
676
677 func (t *Torrent) numPiecesCompleted() (num int) {
678         return t.completedPieces.Len()
679 }
680
681 func (t *Torrent) close() (err error) {
682         t.closed.Set()
683         t.tickleReaders()
684         if t.storage != nil {
685                 t.storageLock.Lock()
686                 t.storage.Close()
687                 t.storageLock.Unlock()
688         }
689         for conn := range t.conns {
690                 conn.Close()
691         }
692         t.cl.event.Broadcast()
693         t.pieceStateChanges.Close()
694         t.updateWantPeersEvent()
695         return
696 }
697
698 func (t *Torrent) requestOffset(r request) int64 {
699         return torrentRequestOffset(*t.length, int64(t.usualPieceSize()), r)
700 }
701
702 // Return the request that would include the given offset into the torrent
703 // data. Returns !ok if there is no such request.
704 func (t *Torrent) offsetRequest(off int64) (req request, ok bool) {
705         return torrentOffsetRequest(*t.length, t.info.PieceLength, int64(t.chunkSize), off)
706 }
707
708 func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
709         tr := perf.NewTimer()
710
711         n, err := t.pieces[piece].Storage().WriteAt(data, begin)
712         if err == nil && n != len(data) {
713                 err = io.ErrShortWrite
714         }
715         if err == nil {
716                 tr.Mark("write chunk")
717         }
718         return
719 }
720
721 func (t *Torrent) bitfield() (bf []bool) {
722         bf = make([]bool, t.numPieces())
723         t.completedPieces.IterTyped(func(piece int) (again bool) {
724                 bf[piece] = true
725                 return true
726         })
727         return
728 }
729
730 func (t *Torrent) pieceNumChunks(piece int) int {
731         return int((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize)
732 }
733
734 func (t *Torrent) pendAllChunkSpecs(pieceIndex int) {
735         t.pieces[pieceIndex].dirtyChunks.Clear()
736 }
737
738 type Peer struct {
739         Id     [20]byte
740         IP     net.IP
741         Port   int
742         Source peerSource
743         // Peer is known to support encryption.
744         SupportsEncryption bool
745         pexPeerFlags
746 }
747
748 func (me *Peer) FromPex(na krpc.NodeAddr, fs pexPeerFlags) {
749         me.IP = append([]byte(nil), na.IP...)
750         me.Port = na.Port
751         me.Source = peerSourcePEX
752         // If they prefer encryption, they must support it.
753         if fs.Get(pexPrefersEncryption) {
754                 me.SupportsEncryption = true
755         }
756         me.pexPeerFlags = fs
757 }
758
759 func (t *Torrent) pieceLength(piece int) pp.Integer {
760         if t.info.PieceLength == 0 {
761                 // There will be no variance amongst pieces. Only pain.
762                 return 0
763         }
764         if piece == t.numPieces()-1 {
765                 ret := pp.Integer(*t.length % t.info.PieceLength)
766                 if ret != 0 {
767                         return ret
768                 }
769         }
770         return pp.Integer(t.info.PieceLength)
771 }
772
773 func (t *Torrent) hashPiece(piece int) (ret metainfo.Hash) {
774         hash := pieceHash.New()
775         p := &t.pieces[piece]
776         p.waitNoPendingWrites()
777         ip := t.info.Piece(piece)
778         pl := ip.Length()
779         n, err := io.Copy(hash, io.NewSectionReader(t.pieces[piece].Storage(), 0, pl))
780         if n == pl {
781                 missinggo.CopyExact(&ret, hash.Sum(nil))
782                 return
783         }
784         if err != io.ErrUnexpectedEOF && !os.IsNotExist(err) {
785                 log.Printf("unexpected error hashing piece with %T: %s", t.storage.TorrentImpl, err)
786         }
787         return
788 }
789
790 func (t *Torrent) haveAnyPieces() bool {
791         return t.completedPieces.Len() != 0
792 }
793
794 func (t *Torrent) haveAllPieces() bool {
795         if !t.haveInfo() {
796                 return false
797         }
798         return t.completedPieces.Len() == t.numPieces()
799 }
800
801 func (t *Torrent) havePiece(index int) bool {
802         return t.haveInfo() && t.pieceComplete(index)
803 }
804
805 func (t *Torrent) haveChunk(r request) (ret bool) {
806         // defer func() {
807         //      log.Println("have chunk", r, ret)
808         // }()
809         if !t.haveInfo() {
810                 return false
811         }
812         if t.pieceComplete(int(r.Index)) {
813                 return true
814         }
815         p := &t.pieces[r.Index]
816         return !p.pendingChunk(r.chunkSpec, t.chunkSize)
817 }
818
819 func chunkIndex(cs chunkSpec, chunkSize pp.Integer) int {
820         return int(cs.Begin / chunkSize)
821 }
822
823 func (t *Torrent) wantPiece(r request) bool {
824         if !t.wantPieceIndex(int(r.Index)) {
825                 return false
826         }
827         if t.pieces[r.Index].pendingChunk(r.chunkSpec, t.chunkSize) {
828                 return true
829         }
830         // TODO: What about pieces that were wanted, but aren't now, and aren't
831         // completed either? That used to be done here.
832         return false
833 }
834
835 func (t *Torrent) wantPieceIndex(index int) bool {
836         if !t.haveInfo() {
837                 return false
838         }
839         if index < 0 || index >= t.numPieces() {
840                 return false
841         }
842         p := &t.pieces[index]
843         if p.queuedForHash() {
844                 return false
845         }
846         if p.hashing {
847                 return false
848         }
849         if t.pieceComplete(index) {
850                 return false
851         }
852         if t.pendingPieces.Contains(index) {
853                 return true
854         }
855         // log.Printf("piece %d not pending", index)
856         return !t.forReaderOffsetPieces(func(begin, end int) bool {
857                 return index < begin || index >= end
858         })
859 }
860
861 // The worst connection is one that hasn't been sent, or sent anything useful
862 // for the longest. A bad connection is one that usually sends us unwanted
863 // pieces, or has been in worser half of the established connections for more
864 // than a minute.
865 func (t *Torrent) worstBadConn() *connection {
866         wcs := worseConnSlice{t.unclosedConnsAsSlice()}
867         heap.Init(&wcs)
868         for wcs.Len() != 0 {
869                 c := heap.Pop(&wcs).(*connection)
870                 if c.stats.ChunksReadUnwanted >= 6 && c.stats.ChunksReadUnwanted > c.stats.ChunksReadUseful {
871                         return c
872                 }
873                 // If the connection is in the worst half of the established
874                 // connection quota and is older than a minute.
875                 if wcs.Len() >= (t.maxEstablishedConns+1)/2 {
876                         // Give connections 1 minute to prove themselves.
877                         if time.Since(c.completedHandshake) > time.Minute {
878                                 return c
879                         }
880                 }
881         }
882         return nil
883 }
884
885 type PieceStateChange struct {
886         Index int
887         PieceState
888 }
889
890 func (t *Torrent) publishPieceChange(piece int) {
891         cur := t.pieceState(piece)
892         p := &t.pieces[piece]
893         if cur != p.publicPieceState {
894                 p.publicPieceState = cur
895                 t.pieceStateChanges.Publish(PieceStateChange{
896                         piece,
897                         cur,
898                 })
899         }
900 }
901
902 func (t *Torrent) pieceNumPendingChunks(piece int) int {
903         if t.pieceComplete(piece) {
904                 return 0
905         }
906         return t.pieceNumChunks(piece) - t.pieces[piece].numDirtyChunks()
907 }
908
909 func (t *Torrent) pieceAllDirty(piece int) bool {
910         return t.pieces[piece].dirtyChunks.Len() == t.pieceNumChunks(piece)
911 }
912
913 func (t *Torrent) readersChanged() {
914         t.updateReaderPieces()
915         t.updateAllPiecePriorities()
916 }
917
918 func (t *Torrent) updateReaderPieces() {
919         t.readerNowPieces, t.readerReadaheadPieces = t.readerPiecePriorities()
920 }
921
922 func (t *Torrent) readerPosChanged(from, to pieceRange) {
923         if from == to {
924                 return
925         }
926         t.updateReaderPieces()
927         // Order the ranges, high and low.
928         l, h := from, to
929         if l.begin > h.begin {
930                 l, h = h, l
931         }
932         if l.end < h.begin {
933                 // Two distinct ranges.
934                 t.updatePiecePriorities(l.begin, l.end)
935                 t.updatePiecePriorities(h.begin, h.end)
936         } else {
937                 // Ranges overlap.
938                 end := l.end
939                 if h.end > end {
940                         end = h.end
941                 }
942                 t.updatePiecePriorities(l.begin, end)
943         }
944 }
945
946 func (t *Torrent) maybeNewConns() {
947         // Tickle the accept routine.
948         t.cl.event.Broadcast()
949         t.openNewConns()
950 }
951
952 func (t *Torrent) piecePriorityChanged(piece int) {
953         // log.Printf("piece %d priority changed", piece)
954         for c := range t.conns {
955                 if c.updatePiecePriority(piece) {
956                         // log.Print("conn piece priority changed")
957                         c.updateRequests()
958                 }
959         }
960         t.maybeNewConns()
961         t.publishPieceChange(piece)
962 }
963
964 func (t *Torrent) updatePiecePriority(piece int) {
965         p := &t.pieces[piece]
966         newPrio := p.uncachedPriority()
967         // log.Printf("torrent %p: piece %d: uncached priority: %v", t, piece, newPrio)
968         if newPrio == PiecePriorityNone {
969                 if !t.pendingPieces.Remove(piece) {
970                         return
971                 }
972         } else {
973                 if !t.pendingPieces.Set(piece, newPrio.BitmapPriority()) {
974                         return
975                 }
976         }
977         t.piecePriorityChanged(piece)
978 }
979
980 func (t *Torrent) updateAllPiecePriorities() {
981         t.updatePiecePriorities(0, len(t.pieces))
982 }
983
984 // Update all piece priorities in one hit. This function should have the same
985 // output as updatePiecePriority, but across all pieces.
986 func (t *Torrent) updatePiecePriorities(begin, end int) {
987         for i := begin; i < end; i++ {
988                 t.updatePiecePriority(i)
989         }
990 }
991
992 // Returns the range of pieces [begin, end) that contains the extent of bytes.
993 func (t *Torrent) byteRegionPieces(off, size int64) (begin, end int) {
994         if off >= *t.length {
995                 return
996         }
997         if off < 0 {
998                 size += off
999                 off = 0
1000         }
1001         if size <= 0 {
1002                 return
1003         }
1004         begin = int(off / t.info.PieceLength)
1005         end = int((off + size + t.info.PieceLength - 1) / t.info.PieceLength)
1006         if end > t.info.NumPieces() {
1007                 end = t.info.NumPieces()
1008         }
1009         return
1010 }
1011
1012 // Returns true if all iterations complete without breaking. Returns the read
1013 // regions for all readers. The reader regions should not be merged as some
1014 // callers depend on this method to enumerate readers.
1015 func (t *Torrent) forReaderOffsetPieces(f func(begin, end int) (more bool)) (all bool) {
1016         for r := range t.readers {
1017                 p := r.pieces
1018                 if p.begin >= p.end {
1019                         continue
1020                 }
1021                 if !f(p.begin, p.end) {
1022                         return false
1023                 }
1024         }
1025         return true
1026 }
1027
1028 func (t *Torrent) piecePriority(piece int) piecePriority {
1029         prio, ok := t.pendingPieces.GetPriority(piece)
1030         if !ok {
1031                 return PiecePriorityNone
1032         }
1033         if prio > 0 {
1034                 panic(prio)
1035         }
1036         ret := piecePriority(-prio)
1037         if ret == PiecePriorityNone {
1038                 panic(piece)
1039         }
1040         return ret
1041 }
1042
1043 func (t *Torrent) pendRequest(req request) {
1044         ci := chunkIndex(req.chunkSpec, t.chunkSize)
1045         t.pieces[req.Index].pendChunkIndex(ci)
1046 }
1047
1048 func (t *Torrent) pieceCompletionChanged(piece int) {
1049         log.Call().Add("piece", piece).AddValue(debugLogValue).Log(t.logger)
1050         t.cl.event.Broadcast()
1051         if t.pieceComplete(piece) {
1052                 t.onPieceCompleted(piece)
1053         } else {
1054                 t.onIncompletePiece(piece)
1055         }
1056         t.updatePiecePriority(piece)
1057 }
1058
1059 func (t *Torrent) numReceivedConns() (ret int) {
1060         for c := range t.conns {
1061                 if c.Discovery == peerSourceIncoming {
1062                         ret++
1063                 }
1064         }
1065         return
1066 }
1067
1068 func (t *Torrent) maxHalfOpen() int {
1069         // Note that if we somehow exceed the maximum established conns, we want
1070         // the negative value to have an effect.
1071         establishedHeadroom := int64(t.maxEstablishedConns - len(t.conns))
1072         extraIncoming := int64(t.numReceivedConns() - t.maxEstablishedConns/2)
1073         // We want to allow some experimentation with new peers, and to try to
1074         // upset an oversupply of received connections.
1075         return int(min(max(5, extraIncoming)+establishedHeadroom, int64(t.cl.halfOpenLimit)))
1076 }
1077
1078 func (t *Torrent) openNewConns() {
1079         defer t.updateWantPeersEvent()
1080         for len(t.peers) != 0 {
1081                 if !t.wantConns() {
1082                         return
1083                 }
1084                 if len(t.halfOpen) >= t.maxHalfOpen() {
1085                         return
1086                 }
1087                 var (
1088                         k peersKey
1089                         p Peer
1090                 )
1091                 for k, p = range t.peers {
1092                         break
1093                 }
1094                 delete(t.peers, k)
1095                 t.initiateConn(p)
1096         }
1097 }
1098
1099 func (t *Torrent) getConnPieceInclination() []int {
1100         _ret := t.connPieceInclinationPool.Get()
1101         if _ret == nil {
1102                 pieceInclinationsNew.Add(1)
1103                 return rand.Perm(t.numPieces())
1104         }
1105         pieceInclinationsReused.Add(1)
1106         return *_ret.(*[]int)
1107 }
1108
1109 func (t *Torrent) putPieceInclination(pi []int) {
1110         t.connPieceInclinationPool.Put(&pi)
1111         pieceInclinationsPut.Add(1)
1112 }
1113
1114 func (t *Torrent) updatePieceCompletion(piece int) {
1115         pcu := t.pieceCompleteUncached(piece)
1116         p := &t.pieces[piece]
1117         changed := t.completedPieces.Get(piece) != pcu.Complete || p.storageCompletionOk != pcu.Ok
1118         log.Fmsg("piece %d completion: %v", piece, pcu.Ok).AddValue(debugLogValue).Log(t.logger)
1119         p.storageCompletionOk = pcu.Ok
1120         t.completedPieces.Set(piece, pcu.Complete)
1121         t.tickleReaders()
1122         // log.Printf("piece %d uncached completion: %v", piece, pcu.Complete)
1123         // log.Printf("piece %d changed: %v", piece, changed)
1124         if changed {
1125                 t.pieceCompletionChanged(piece)
1126         }
1127 }
1128
1129 // Non-blocking read. Client lock is not required.
1130 func (t *Torrent) readAt(b []byte, off int64) (n int, err error) {
1131         p := &t.pieces[off/t.info.PieceLength]
1132         p.waitNoPendingWrites()
1133         return p.Storage().ReadAt(b, off-p.Info().Offset())
1134 }
1135
1136 func (t *Torrent) updateAllPieceCompletions() {
1137         for i := range iter.N(t.numPieces()) {
1138                 t.updatePieceCompletion(i)
1139         }
1140 }
1141
1142 // Returns an error if the metadata was completed, but couldn't be set for
1143 // some reason. Blame it on the last peer to contribute.
1144 func (t *Torrent) maybeCompleteMetadata() error {
1145         if t.haveInfo() {
1146                 // Nothing to do.
1147                 return nil
1148         }
1149         if !t.haveAllMetadataPieces() {
1150                 // Don't have enough metadata pieces.
1151                 return nil
1152         }
1153         err := t.setInfoBytes(t.metadataBytes)
1154         if err != nil {
1155                 t.invalidateMetadata()
1156                 return fmt.Errorf("error setting info bytes: %s", err)
1157         }
1158         if t.cl.config.Debug {
1159                 log.Printf("%s: got metadata from peers", t)
1160         }
1161         return nil
1162 }
1163
1164 func (t *Torrent) readerPieces() (ret bitmap.Bitmap) {
1165         t.forReaderOffsetPieces(func(begin, end int) bool {
1166                 ret.AddRange(begin, end)
1167                 return true
1168         })
1169         return
1170 }
1171
1172 func (t *Torrent) readerPiecePriorities() (now, readahead bitmap.Bitmap) {
1173         t.forReaderOffsetPieces(func(begin, end int) bool {
1174                 if end > begin {
1175                         now.Add(begin)
1176                         readahead.AddRange(begin+1, end)
1177                 }
1178                 return true
1179         })
1180         return
1181 }
1182
1183 func (t *Torrent) needData() bool {
1184         if t.closed.IsSet() {
1185                 return false
1186         }
1187         if !t.haveInfo() {
1188                 return true
1189         }
1190         return t.pendingPieces.Len() != 0
1191 }
1192
1193 func appendMissingStrings(old, new []string) (ret []string) {
1194         ret = old
1195 new:
1196         for _, n := range new {
1197                 for _, o := range old {
1198                         if o == n {
1199                                 continue new
1200                         }
1201                 }
1202                 ret = append(ret, n)
1203         }
1204         return
1205 }
1206
1207 func appendMissingTrackerTiers(existing [][]string, minNumTiers int) (ret [][]string) {
1208         ret = existing
1209         for minNumTiers > len(ret) {
1210                 ret = append(ret, nil)
1211         }
1212         return
1213 }
1214
1215 func (t *Torrent) addTrackers(announceList [][]string) {
1216         fullAnnounceList := &t.metainfo.AnnounceList
1217         t.metainfo.AnnounceList = appendMissingTrackerTiers(*fullAnnounceList, len(announceList))
1218         for tierIndex, trackerURLs := range announceList {
1219                 (*fullAnnounceList)[tierIndex] = appendMissingStrings((*fullAnnounceList)[tierIndex], trackerURLs)
1220         }
1221         t.startMissingTrackerScrapers()
1222         t.updateWantPeersEvent()
1223 }
1224
1225 // Don't call this before the info is available.
1226 func (t *Torrent) bytesCompleted() int64 {
1227         if !t.haveInfo() {
1228                 return 0
1229         }
1230         return t.info.TotalLength() - t.bytesLeft()
1231 }
1232
1233 func (t *Torrent) SetInfoBytes(b []byte) (err error) {
1234         t.cl.mu.Lock()
1235         defer t.cl.mu.Unlock()
1236         return t.setInfoBytes(b)
1237 }
1238
1239 // Returns true if connection is removed from torrent.Conns.
1240 func (t *Torrent) deleteConnection(c *connection) (ret bool) {
1241         if !c.closed.IsSet() {
1242                 panic("connection is not closed")
1243                 // There are behaviours prevented by the closed state that will fail
1244                 // if the connection has been deleted.
1245         }
1246         _, ret = t.conns[c]
1247         delete(t.conns, c)
1248         c.deleteAllRequests()
1249         if len(t.conns) == 0 {
1250                 t.assertNoPendingRequests()
1251         }
1252         return
1253 }
1254
1255 func (t *Torrent) assertNoPendingRequests() {
1256         for _, num := range t.pendingRequests {
1257                 if num != 0 {
1258                         panic(num)
1259                 }
1260         }
1261 }
1262
1263 func (t *Torrent) dropConnection(c *connection) {
1264         t.cl.event.Broadcast()
1265         c.Close()
1266         if t.deleteConnection(c) {
1267                 t.openNewConns()
1268         }
1269 }
1270
1271 func (t *Torrent) wantPeers() bool {
1272         if t.closed.IsSet() {
1273                 return false
1274         }
1275         if len(t.peers) > t.cl.config.TorrentPeersLowWater {
1276                 return false
1277         }
1278         return t.needData() || t.seeding()
1279 }
1280
1281 func (t *Torrent) updateWantPeersEvent() {
1282         if t.wantPeers() {
1283                 t.wantPeersEvent.Set()
1284         } else {
1285                 t.wantPeersEvent.Clear()
1286         }
1287 }
1288
1289 // Returns whether the client should make effort to seed the torrent.
1290 func (t *Torrent) seeding() bool {
1291         cl := t.cl
1292         if t.closed.IsSet() {
1293                 return false
1294         }
1295         if cl.config.NoUpload {
1296                 return false
1297         }
1298         if !cl.config.Seed {
1299                 return false
1300         }
1301         if cl.config.DisableAggressiveUpload && t.needData() {
1302                 return false
1303         }
1304         return true
1305 }
1306
1307 func (t *Torrent) startScrapingTracker(_url string) {
1308         if _url == "" {
1309                 return
1310         }
1311         u, _ := url.Parse(_url)
1312         if u.Scheme == "udp" {
1313                 u.Scheme = "udp4"
1314                 t.startScrapingTracker(u.String())
1315                 u.Scheme = "udp6"
1316                 t.startScrapingTracker(u.String())
1317                 return
1318         }
1319         if u.Scheme == "udp4" && (t.cl.config.DisableIPv4Peers || t.cl.config.DisableIPv4) {
1320                 return
1321         }
1322         if u.Scheme == "udp6" && t.cl.config.DisableIPv6 {
1323                 return
1324         }
1325         if _, ok := t.trackerAnnouncers[_url]; ok {
1326                 return
1327         }
1328         newAnnouncer := &trackerScraper{
1329                 u: *u,
1330                 t: t,
1331         }
1332         if t.trackerAnnouncers == nil {
1333                 t.trackerAnnouncers = make(map[string]*trackerScraper)
1334         }
1335         t.trackerAnnouncers[_url] = newAnnouncer
1336         go newAnnouncer.Run()
1337 }
1338
1339 // Adds and starts tracker scrapers for tracker URLs that aren't already
1340 // running.
1341 func (t *Torrent) startMissingTrackerScrapers() {
1342         if t.cl.config.DisableTrackers {
1343                 return
1344         }
1345         t.startScrapingTracker(t.metainfo.Announce)
1346         for _, tier := range t.metainfo.AnnounceList {
1347                 for _, url := range tier {
1348                         t.startScrapingTracker(url)
1349                 }
1350         }
1351 }
1352
1353 // Returns an AnnounceRequest with fields filled out to defaults and current
1354 // values.
1355 func (t *Torrent) announceRequest() tracker.AnnounceRequest {
1356         return tracker.AnnounceRequest{
1357                 Event:    tracker.None,
1358                 NumWant:  -1,
1359                 Port:     uint16(t.cl.incomingPeerPort()),
1360                 PeerId:   t.cl.peerID,
1361                 InfoHash: t.infoHash,
1362                 Left:     t.bytesLeftAnnounce(),
1363                 Key:      t.cl.announceKey(),
1364         }
1365 }
1366
1367 // Adds peers revealed in an announce until the announce ends, or we have
1368 // enough peers.
1369 func (t *Torrent) consumeDHTAnnounce(pvs <-chan dht.PeersValues) {
1370         cl := t.cl
1371         // Count all the unique addresses we got during this announce.
1372         allAddrs := make(map[string]struct{})
1373         for {
1374                 select {
1375                 case v, ok := <-pvs:
1376                         if !ok {
1377                                 return
1378                         }
1379                         addPeers := make([]Peer, 0, len(v.Peers))
1380                         for _, cp := range v.Peers {
1381                                 if cp.Port == 0 {
1382                                         // Can't do anything with this.
1383                                         continue
1384                                 }
1385                                 addPeers = append(addPeers, Peer{
1386                                         IP:     cp.IP[:],
1387                                         Port:   cp.Port,
1388                                         Source: peerSourceDHTGetPeers,
1389                                 })
1390                                 key := (&net.UDPAddr{
1391                                         IP:   cp.IP[:],
1392                                         Port: cp.Port,
1393                                 }).String()
1394                                 allAddrs[key] = struct{}{}
1395                         }
1396                         cl.mu.Lock()
1397                         t.addPeers(addPeers)
1398                         numPeers := len(t.peers)
1399                         cl.mu.Unlock()
1400                         if numPeers >= cl.config.TorrentPeersHighWater {
1401                                 return
1402                         }
1403                 case <-t.closed.LockedChan(&cl.mu):
1404                         return
1405                 }
1406         }
1407 }
1408
1409 func (t *Torrent) announceDHT(impliedPort bool) (err error) {
1410         cl := t.cl
1411         ps, err := cl.dHT.Announce(t.infoHash, cl.incomingPeerPort(), impliedPort)
1412         if err != nil {
1413                 return
1414         }
1415         t.consumeDHTAnnounce(ps.Peers)
1416         ps.Close()
1417         return
1418 }
1419
1420 func (t *Torrent) dhtAnnouncer() {
1421         cl := t.cl
1422         for {
1423                 select {
1424                 case <-t.wantPeersEvent.LockedChan(&cl.mu):
1425                 case <-t.closed.LockedChan(&cl.mu):
1426                         return
1427                 }
1428                 err := t.announceDHT(true)
1429                 func() {
1430                         cl.mu.Lock()
1431                         defer cl.mu.Unlock()
1432                         if err == nil {
1433                                 t.numDHTAnnounces++
1434                         } else {
1435                                 log.Printf("error announcing %q to DHT: %s", t, err)
1436                         }
1437                 }()
1438                 select {
1439                 case <-t.closed.LockedChan(&cl.mu):
1440                         return
1441                 case <-time.After(5 * time.Minute):
1442                 }
1443         }
1444 }
1445
1446 func (t *Torrent) addPeers(peers []Peer) {
1447         for _, p := range peers {
1448                 t.addPeer(p)
1449         }
1450 }
1451
1452 func (t *Torrent) Stats() TorrentStats {
1453         t.cl.mu.Lock()
1454         defer t.cl.mu.Unlock()
1455         return t.statsLocked()
1456 }
1457
1458 func (t *Torrent) statsLocked() TorrentStats {
1459         t.stats.ActivePeers = len(t.conns)
1460         t.stats.HalfOpenPeers = len(t.halfOpen)
1461         t.stats.PendingPeers = len(t.peers)
1462         t.stats.TotalPeers = t.numTotalPeers()
1463         t.stats.ConnectedSeeders = 0
1464         for c := range t.conns {
1465                 if all, ok := c.peerHasAllPieces(); all && ok {
1466                         t.stats.ConnectedSeeders++
1467                 }
1468         }
1469         return t.stats
1470 }
1471
1472 // The total number of peers in the torrent.
1473 func (t *Torrent) numTotalPeers() int {
1474         peers := make(map[string]struct{})
1475         for conn := range t.conns {
1476                 ra := conn.conn.RemoteAddr()
1477                 if ra == nil {
1478                         // It's been closed and doesn't support RemoteAddr.
1479                         continue
1480                 }
1481                 peers[ra.String()] = struct{}{}
1482         }
1483         for addr := range t.halfOpen {
1484                 peers[addr] = struct{}{}
1485         }
1486         for _, peer := range t.peers {
1487                 peers[fmt.Sprintf("%s:%d", peer.IP, peer.Port)] = struct{}{}
1488         }
1489         return len(peers)
1490 }
1491
1492 // Reconcile bytes transferred before connection was associated with a
1493 // torrent.
1494 func (t *Torrent) reconcileHandshakeStats(c *connection) {
1495         t.stats.wroteBytes(c.stats.BytesWritten)
1496         t.stats.readBytes(c.stats.BytesRead)
1497 }
1498
1499 // Returns true if the connection is added.
1500 func (t *Torrent) addConnection(c *connection, outgoing bool) bool {
1501         if t.closed.IsSet() {
1502                 return false
1503         }
1504         if !t.wantConns() {
1505                 return false
1506         }
1507         for c0 := range t.conns {
1508                 if c.PeerID == c0.PeerID {
1509                         // Already connected to a client with that ID.
1510                         duplicateClientConns.Add(1)
1511                         lower := string(t.cl.peerID[:]) < string(c.PeerID[:])
1512                         // Retain the connection from initiated from lower peer ID to
1513                         // higher.
1514                         if outgoing == lower {
1515                                 // Close the other one.
1516                                 c0.Close()
1517                                 // TODO: Is it safe to delete from the map while we're
1518                                 // iterating over it?
1519                                 t.deleteConnection(c0)
1520                         } else {
1521                                 // Abandon this one.
1522                                 return false
1523                         }
1524                 }
1525         }
1526         if len(t.conns) >= t.maxEstablishedConns {
1527                 c := t.worstBadConn()
1528                 if c == nil {
1529                         return false
1530                 }
1531                 if t.cl.config.Debug && missinggo.CryHeard() {
1532                         log.Printf("%s: dropping connection to make room for new one:\n    %s", t, c)
1533                 }
1534                 c.Close()
1535                 t.deleteConnection(c)
1536         }
1537         if len(t.conns) >= t.maxEstablishedConns {
1538                 panic(len(t.conns))
1539         }
1540         c.setTorrent(t)
1541         return true
1542 }
1543
1544 func (t *Torrent) wantConns() bool {
1545         if !t.networkingEnabled {
1546                 return false
1547         }
1548         if t.closed.IsSet() {
1549                 return false
1550         }
1551         if !t.seeding() && !t.needData() {
1552                 return false
1553         }
1554         if len(t.conns) < t.maxEstablishedConns {
1555                 return true
1556         }
1557         return t.worstBadConn() != nil
1558 }
1559
1560 func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
1561         t.cl.mu.Lock()
1562         defer t.cl.mu.Unlock()
1563         oldMax = t.maxEstablishedConns
1564         t.maxEstablishedConns = max
1565         wcs := slices.HeapInterface(slices.FromMapKeys(t.conns), worseConn)
1566         for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 {
1567                 t.dropConnection(wcs.Pop().(*connection))
1568         }
1569         t.openNewConns()
1570         return oldMax
1571 }
1572
1573 func (t *Torrent) mu() missinggo.RWLocker {
1574         return &t.cl.mu
1575 }
1576
1577 func (t *Torrent) pieceHashed(piece int, correct bool) {
1578         log.Fmsg("hashed piece %d", piece).Add("piece", piece).Add("passed", correct).AddValue(debugLogValue).Log(t.logger)
1579         if t.closed.IsSet() {
1580                 return
1581         }
1582         p := &t.pieces[piece]
1583         touchers := t.reapPieceTouchers(piece)
1584         if p.storageCompletionOk {
1585                 // Don't score the first time a piece is hashed, it could be an
1586                 // initial check.
1587                 if correct {
1588                         pieceHashedCorrect.Add(1)
1589                 } else {
1590                         log.Printf("%s: piece %d (%s) failed hash: %d connections contributed", t, piece, p.hash, len(touchers))
1591                         pieceHashedNotCorrect.Add(1)
1592                 }
1593         }
1594         if correct {
1595                 if len(touchers) != 0 {
1596                         t.stats.PiecesDirtiedGood++
1597                 }
1598                 for _, c := range touchers {
1599                         c.stats.PiecesDirtiedGood++
1600                 }
1601                 err := p.Storage().MarkComplete()
1602                 if err != nil {
1603                         log.Printf("%T: error marking piece complete %d: %s", t.storage, piece, err)
1604                 }
1605         } else {
1606                 if len(touchers) != 0 {
1607                         t.stats.PiecesDirtiedBad++
1608                         for _, c := range touchers {
1609                                 // Y u do dis peer?!
1610                                 c.stats.PiecesDirtiedBad++
1611                         }
1612                         slices.Sort(touchers, connLessTrusted)
1613                         if t.cl.config.Debug {
1614                                 log.Printf("dropping first corresponding conn from trust: %v", func() (ret []int64) {
1615                                         for _, c := range touchers {
1616                                                 ret = append(ret, c.netGoodPiecesDirtied())
1617                                         }
1618                                         return
1619                                 }())
1620                         }
1621                         c := touchers[0]
1622                         t.cl.banPeerIP(missinggo.AddrIP(c.remoteAddr()))
1623                         c.Drop()
1624                 }
1625                 t.onIncompletePiece(piece)
1626                 p.Storage().MarkNotComplete()
1627         }
1628         t.updatePieceCompletion(piece)
1629 }
1630
1631 func (t *Torrent) cancelRequestsForPiece(piece int) {
1632         // TODO: Make faster
1633         for cn := range t.conns {
1634                 cn.tickleWriter()
1635         }
1636 }
1637
1638 func (t *Torrent) onPieceCompleted(piece int) {
1639         t.pendAllChunkSpecs(piece)
1640         t.cancelRequestsForPiece(piece)
1641         for conn := range t.conns {
1642                 conn.Have(piece)
1643         }
1644 }
1645
1646 // Called when a piece is found to be not complete.
1647 func (t *Torrent) onIncompletePiece(piece int) {
1648         if t.pieceAllDirty(piece) {
1649                 t.pendAllChunkSpecs(piece)
1650         }
1651         if !t.wantPieceIndex(piece) {
1652                 // log.Printf("piece %d incomplete and unwanted", piece)
1653                 return
1654         }
1655         // We could drop any connections that we told we have a piece that we
1656         // don't here. But there's a test failure, and it seems clients don't care
1657         // if you request pieces that you already claim to have. Pruning bad
1658         // connections might just remove any connections that aren't treating us
1659         // favourably anyway.
1660
1661         // for c := range t.conns {
1662         //      if c.sentHave(piece) {
1663         //              c.Drop()
1664         //      }
1665         // }
1666         for conn := range t.conns {
1667                 if conn.PeerHasPiece(piece) {
1668                         conn.updateRequests()
1669                 }
1670         }
1671 }
1672
1673 func (t *Torrent) verifyPiece(piece int) {
1674         cl := t.cl
1675         cl.mu.Lock()
1676         defer cl.mu.Unlock()
1677         p := &t.pieces[piece]
1678         defer func() {
1679                 p.numVerifies++
1680                 cl.event.Broadcast()
1681         }()
1682         for p.hashing || t.storage == nil {
1683                 cl.event.Wait()
1684         }
1685         if !p.t.piecesQueuedForHash.Remove(piece) {
1686                 panic("piece was not queued")
1687         }
1688         if t.closed.IsSet() || t.pieceComplete(piece) {
1689                 t.updatePiecePriority(piece)
1690                 return
1691         }
1692         p.hashing = true
1693         t.publishPieceChange(piece)
1694         t.storageLock.RLock()
1695         cl.mu.Unlock()
1696         sum := t.hashPiece(piece)
1697         t.storageLock.RUnlock()
1698         cl.mu.Lock()
1699         p.hashing = false
1700         t.pieceHashed(piece, sum == p.hash)
1701         t.publishPieceChange(piece)
1702 }
1703
1704 // Return the connections that touched a piece, and clear the entries while
1705 // doing it.
1706 func (t *Torrent) reapPieceTouchers(piece int) (ret []*connection) {
1707         for c := range t.pieces[piece].dirtiers {
1708                 delete(c.peerTouchedPieces, piece)
1709                 ret = append(ret, c)
1710         }
1711         t.pieces[piece].dirtiers = nil
1712         return
1713 }
1714
1715 func (t *Torrent) connsAsSlice() (ret []*connection) {
1716         for c := range t.conns {
1717                 ret = append(ret, c)
1718         }
1719         return
1720 }
1721
1722 // Currently doesn't really queue, but should in the future.
1723 func (t *Torrent) queuePieceCheck(pieceIndex int) {
1724         piece := &t.pieces[pieceIndex]
1725         if piece.queuedForHash() {
1726                 return
1727         }
1728         t.piecesQueuedForHash.Add(pieceIndex)
1729         t.publishPieceChange(pieceIndex)
1730         go t.verifyPiece(pieceIndex)
1731 }
1732
1733 func (t *Torrent) VerifyData() {
1734         for i := range iter.N(t.NumPieces()) {
1735                 t.Piece(i).VerifyData()
1736         }
1737 }
1738
1739 // Start the process of connecting to the given peer for the given torrent if
1740 // appropriate.
1741 func (t *Torrent) initiateConn(peer Peer) {
1742         if peer.Id == t.cl.peerID {
1743                 return
1744         }
1745         if t.cl.badPeerIPPort(peer.IP, peer.Port) {
1746                 return
1747         }
1748         addr := net.JoinHostPort(peer.IP.String(), fmt.Sprintf("%d", peer.Port))
1749         if t.addrActive(addr) {
1750                 return
1751         }
1752         t.halfOpen[addr] = peer
1753         go t.cl.outgoingConnection(t, addr, peer.Source)
1754 }