]> Sergey Matveev's repositories - btrtrc.git/blob - torrent.go
6a455e1ff4987e61923d4706433626b4dd943dde
[btrtrc.git] / torrent.go
1 package torrent
2
3 import (
4         "container/heap"
5         "crypto/sha1"
6         "errors"
7         "fmt"
8         "io"
9         "math"
10         "math/rand"
11         "net"
12         "net/url"
13         "os"
14         "sync"
15         "text/tabwriter"
16         "time"
17
18         "github.com/anacrolix/dht"
19         "github.com/anacrolix/log"
20         "github.com/anacrolix/missinggo"
21         "github.com/anacrolix/missinggo/bitmap"
22         "github.com/anacrolix/missinggo/perf"
23         "github.com/anacrolix/missinggo/prioritybitmap"
24         "github.com/anacrolix/missinggo/pubsub"
25         "github.com/anacrolix/missinggo/slices"
26         "github.com/anacrolix/torrent/bencode"
27         "github.com/anacrolix/torrent/metainfo"
28         pp "github.com/anacrolix/torrent/peer_protocol"
29         "github.com/anacrolix/torrent/storage"
30         "github.com/anacrolix/torrent/tracker"
31         "github.com/davecgh/go-spew/spew"
32 )
33
34 func (t *Torrent) chunkIndexSpec(chunkIndex pp.Integer, piece pieceIndex) chunkSpec {
35         return chunkIndexSpec(chunkIndex, t.pieceLength(piece), t.chunkSize)
36 }
37
38 type peersKey struct {
39         IPBytes string
40         Port    int
41 }
42
43 // Maintains state of torrent within a Client.
44 type Torrent struct {
45         // Torrent-level aggregate statistics. First in struct to ensure 64-bit
46         // alignment. See #262.
47         stats  ConnStats
48         cl     *Client
49         logger *log.Logger
50
51         networkingEnabled bool
52
53         // Determines what chunks to request from peers. 1: Favour higher priority
54         // pieces with some fuzzing to reduce overlaps and wastage across
55         // connections. 2: The fastest connection downloads strictly in order of
56         // priority, while all others adher to their piece inclications. 3:
57         // Requests are strictly by piece priority, and not duplicated until
58         // duplicateRequestTimeout is reached.
59         requestStrategy int
60         // How long to avoid duplicating a pending request.
61         duplicateRequestTimeout time.Duration
62
63         closed   missinggo.Event
64         infoHash metainfo.Hash
65         pieces   []Piece
66         // Values are the piece indices that changed.
67         pieceStateChanges *pubsub.PubSub
68         // The size of chunks to request from peers over the wire. This is
69         // normally 16KiB by convention these days.
70         chunkSize pp.Integer
71         chunkPool *sync.Pool
72         // Total length of the torrent in bytes. Stored because it's not O(1) to
73         // get this from the info dict.
74         length *int64
75
76         // The storage to open when the info dict becomes available.
77         storageOpener *storage.Client
78         // Storage for torrent data.
79         storage *storage.Torrent
80         // Read-locked for using storage, and write-locked for Closing.
81         storageLock sync.RWMutex
82
83         // TODO: Only announce stuff is used?
84         metainfo metainfo.MetaInfo
85
86         // The info dict. nil if we don't have it (yet).
87         info  *metainfo.Info
88         files *[]*File
89
90         // Active peer connections, running message stream loops. TODO: Make this
91         // open (not-closed) connections only.
92         conns               map[*connection]struct{}
93         maxEstablishedConns int
94         // Set of addrs to which we're attempting to connect. Connections are
95         // half-open until all handshakes are completed.
96         halfOpen    map[string]Peer
97         fastestConn *connection
98
99         // Reserve of peers to connect to. A peer can be both here and in the
100         // active connections if were told about the peer after connecting with
101         // them. That encourages us to reconnect to peers that are well known in
102         // the swarm.
103         peers          prioritizedPeers
104         wantPeersEvent missinggo.Event
105         // An announcer for each tracker URL.
106         trackerAnnouncers map[string]*trackerScraper
107         // How many times we've initiated a DHT announce. TODO: Move into stats.
108         numDHTAnnounces int
109
110         // Name used if the info name isn't available. Should be cleared when the
111         // Info does become available.
112         displayName string
113
114         // The bencoded bytes of the info dict. This is actively manipulated if
115         // the info bytes aren't initially available, and we try to fetch them
116         // from peers.
117         metadataBytes []byte
118         // Each element corresponds to the 16KiB metadata pieces. If true, we have
119         // received that piece.
120         metadataCompletedChunks []bool
121         metadataChanged         sync.Cond
122
123         // Set when .Info is obtained.
124         gotMetainfo missinggo.Event
125
126         readers               map[*reader]struct{}
127         readerNowPieces       bitmap.Bitmap
128         readerReadaheadPieces bitmap.Bitmap
129
130         // A cache of pieces we need to get. Calculated from various piece and
131         // file priorities and completion states elsewhere.
132         pendingPieces prioritybitmap.PriorityBitmap
133         // A cache of completed piece indices.
134         completedPieces bitmap.Bitmap
135         // Pieces that need to be hashed.
136         piecesQueuedForHash bitmap.Bitmap
137
138         // A pool of piece priorities []int for assignment to new connections.
139         // These "inclinations" are used to give connections preference for
140         // different pieces.
141         connPieceInclinationPool sync.Pool
142
143         // Count of each request across active connections.
144         pendingRequests map[request]int
145         // The last time we requested a chunk. Deleting the request from any
146         // connection will clear this value.
147         lastRequested map[request]*time.Timer
148 }
149
150 func (t *Torrent) tickleReaders() {
151         t.cl.event.Broadcast()
152 }
153
154 // Returns a channel that is closed when the Torrent is closed.
155 func (t *Torrent) Closed() <-chan struct{} {
156         return t.closed.LockedChan(t.cl.locker())
157 }
158
159 // KnownSwarm returns the known subset of the peers in the Torrent's swarm, including active,
160 // pending, and half-open peers.
161 func (t *Torrent) KnownSwarm() (ks []Peer) {
162         // Add pending peers to the list
163         t.peers.Each(func(peer Peer) {
164                 ks = append(ks, peer)
165         })
166
167         // Add half-open peers to the list
168         for _, peer := range t.halfOpen {
169                 ks = append(ks, peer)
170         }
171
172         // Add active peers to the list
173         for conn := range t.conns {
174
175                 ks = append(ks, Peer{
176                         Id:     conn.PeerID,
177                         IP:     conn.remoteAddr.IP,
178                         Port:   int(conn.remoteAddr.Port),
179                         Source: conn.Discovery,
180                         // > If the connection is encrypted, that's certainly enough to set SupportsEncryption.
181                         // > But if we're not connected to them with an encrypted connection, I couldn't say
182                         // > what's appropriate. We can carry forward the SupportsEncryption value as we
183                         // > received it from trackers/DHT/PEX, or just use the encryption state for the
184                         // > connection. It's probably easiest to do the latter for now.
185                         // https://github.com/anacrolix/torrent/pull/188
186                         SupportsEncryption: conn.headerEncrypted,
187                 })
188         }
189
190         return
191 }
192
193 func (t *Torrent) setChunkSize(size pp.Integer) {
194         t.chunkSize = size
195         t.chunkPool = &sync.Pool{
196                 New: func() interface{} {
197                         b := make([]byte, size)
198                         return &b
199                 },
200         }
201 }
202
203 func (t *Torrent) setDisplayName(dn string) {
204         if t.haveInfo() {
205                 return
206         }
207         t.displayName = dn
208 }
209
210 func (t *Torrent) pieceComplete(piece pieceIndex) bool {
211         return t.completedPieces.Get(bitmap.BitIndex(piece))
212 }
213
214 func (t *Torrent) pieceCompleteUncached(piece pieceIndex) storage.Completion {
215         return t.pieces[piece].Storage().Completion()
216 }
217
218 // There's a connection to that address already.
219 func (t *Torrent) addrActive(addr string) bool {
220         if _, ok := t.halfOpen[addr]; ok {
221                 return true
222         }
223         for c := range t.conns {
224                 ra := c.remoteAddr
225                 if ra.String() == addr {
226                         return true
227                 }
228         }
229         return false
230 }
231
232 func (t *Torrent) unclosedConnsAsSlice() (ret []*connection) {
233         ret = make([]*connection, 0, len(t.conns))
234         for c := range t.conns {
235                 if !c.closed.IsSet() {
236                         ret = append(ret, c)
237                 }
238         }
239         return
240 }
241
242 func (t *Torrent) addPeer(p Peer) {
243         cl := t.cl
244         peersAddedBySource.Add(string(p.Source), 1)
245         if t.closed.IsSet() {
246                 return
247         }
248         if cl.badPeerIPPort(p.IP, p.Port) {
249                 torrent.Add("peers not added because of bad addr", 1)
250                 return
251         }
252         if t.peers.Add(p) {
253                 torrent.Add("peers replaced", 1)
254         }
255         t.openNewConns()
256         for t.peers.Len() > cl.config.TorrentPeersHighWater {
257                 _, ok := t.peers.DeleteMin()
258                 if ok {
259                         torrent.Add("excess reserve peers discarded", 1)
260                 }
261         }
262 }
263
264 func (t *Torrent) invalidateMetadata() {
265         for i := range t.metadataCompletedChunks {
266                 t.metadataCompletedChunks[i] = false
267         }
268         t.info = nil
269 }
270
271 func (t *Torrent) saveMetadataPiece(index int, data []byte) {
272         if t.haveInfo() {
273                 return
274         }
275         if index >= len(t.metadataCompletedChunks) {
276                 log.Printf("%s: ignoring metadata piece %d", t, index)
277                 return
278         }
279         copy(t.metadataBytes[(1<<14)*index:], data)
280         t.metadataCompletedChunks[index] = true
281 }
282
283 func (t *Torrent) metadataPieceCount() int {
284         return (len(t.metadataBytes) + (1 << 14) - 1) / (1 << 14)
285 }
286
287 func (t *Torrent) haveMetadataPiece(piece int) bool {
288         if t.haveInfo() {
289                 return (1<<14)*piece < len(t.metadataBytes)
290         } else {
291                 return piece < len(t.metadataCompletedChunks) && t.metadataCompletedChunks[piece]
292         }
293 }
294
295 func (t *Torrent) metadataSizeKnown() bool {
296         return t.metadataBytes != nil
297 }
298
299 func (t *Torrent) metadataSize() int {
300         return len(t.metadataBytes)
301 }
302
303 func infoPieceHashes(info *metainfo.Info) (ret []string) {
304         for i := 0; i < len(info.Pieces); i += sha1.Size {
305                 ret = append(ret, string(info.Pieces[i:i+sha1.Size]))
306         }
307         return
308 }
309
310 func (t *Torrent) makePieces() {
311         hashes := infoPieceHashes(t.info)
312         t.pieces = make([]Piece, len(hashes), len(hashes))
313         for i, hash := range hashes {
314                 piece := &t.pieces[i]
315                 piece.t = t
316                 piece.index = pieceIndex(i)
317                 piece.noPendingWrites.L = &piece.pendingWritesMutex
318                 missinggo.CopyExact(piece.hash[:], hash)
319                 files := *t.files
320                 beginFile := pieceFirstFileIndex(piece.torrentBeginOffset(), files)
321                 endFile := pieceEndFileIndex(piece.torrentEndOffset(), files)
322                 piece.files = files[beginFile:endFile]
323         }
324 }
325
326 // Returns the index of the first file containing the piece. files must be
327 // ordered by offset.
328 func pieceFirstFileIndex(pieceOffset int64, files []*File) int {
329         for i, f := range files {
330                 if f.offset+f.length > pieceOffset {
331                         return i
332                 }
333         }
334         return 0
335 }
336
337 // Returns the index after the last file containing the piece. files must be
338 // ordered by offset.
339 func pieceEndFileIndex(pieceEndOffset int64, files []*File) int {
340         for i, f := range files {
341                 if f.offset+f.length >= pieceEndOffset {
342                         return i + 1
343                 }
344         }
345         return 0
346 }
347
348 func (t *Torrent) cacheLength() {
349         var l int64
350         for _, f := range t.info.UpvertedFiles() {
351                 l += f.Length
352         }
353         t.length = &l
354 }
355
356 func (t *Torrent) setInfo(info *metainfo.Info) error {
357         if err := validateInfo(info); err != nil {
358                 return fmt.Errorf("bad info: %s", err)
359         }
360         if t.storageOpener != nil {
361                 var err error
362                 t.storage, err = t.storageOpener.OpenTorrent(info, t.infoHash)
363                 if err != nil {
364                         return fmt.Errorf("error opening torrent storage: %s", err)
365                 }
366         }
367         t.info = info
368         t.displayName = "" // Save a few bytes lol.
369         t.initFiles()
370         t.cacheLength()
371         t.makePieces()
372         return nil
373 }
374
375 func (t *Torrent) onSetInfo() {
376         for conn := range t.conns {
377                 if err := conn.setNumPieces(t.numPieces()); err != nil {
378                         log.Printf("closing connection: %s", err)
379                         conn.Close()
380                 }
381         }
382         for i := range t.pieces {
383                 t.updatePieceCompletion(pieceIndex(i))
384                 p := &t.pieces[i]
385                 if !p.storageCompletionOk {
386                         // log.Printf("piece %s completion unknown, queueing check", p)
387                         t.queuePieceCheck(pieceIndex(i))
388                 }
389         }
390         t.cl.event.Broadcast()
391         t.gotMetainfo.Set()
392         t.updateWantPeersEvent()
393         t.pendingRequests = make(map[request]int)
394         t.lastRequested = make(map[request]*time.Timer)
395 }
396
397 // Called when metadata for a torrent becomes available.
398 func (t *Torrent) setInfoBytes(b []byte) error {
399         if metainfo.HashBytes(b) != t.infoHash {
400                 return errors.New("info bytes have wrong hash")
401         }
402         var info metainfo.Info
403         if err := bencode.Unmarshal(b, &info); err != nil {
404                 return fmt.Errorf("error unmarshalling info bytes: %s", err)
405         }
406         if err := t.setInfo(&info); err != nil {
407                 return err
408         }
409         t.metadataBytes = b
410         t.metadataCompletedChunks = nil
411         t.onSetInfo()
412         return nil
413 }
414
415 func (t *Torrent) haveAllMetadataPieces() bool {
416         if t.haveInfo() {
417                 return true
418         }
419         if t.metadataCompletedChunks == nil {
420                 return false
421         }
422         for _, have := range t.metadataCompletedChunks {
423                 if !have {
424                         return false
425                 }
426         }
427         return true
428 }
429
430 // TODO: Propagate errors to disconnect peer.
431 func (t *Torrent) setMetadataSize(bytes int) (err error) {
432         if t.haveInfo() {
433                 // We already know the correct metadata size.
434                 return
435         }
436         if bytes <= 0 || bytes > 10000000 { // 10MB, pulled from my ass.
437                 return errors.New("bad size")
438         }
439         if t.metadataBytes != nil && len(t.metadataBytes) == int(bytes) {
440                 return
441         }
442         t.metadataBytes = make([]byte, bytes)
443         t.metadataCompletedChunks = make([]bool, (bytes+(1<<14)-1)/(1<<14))
444         t.metadataChanged.Broadcast()
445         for c := range t.conns {
446                 c.requestPendingMetadata()
447         }
448         return
449 }
450
451 // The current working name for the torrent. Either the name in the info dict,
452 // or a display name given such as by the dn value in a magnet link, or "".
453 func (t *Torrent) name() string {
454         if t.haveInfo() {
455                 return t.info.Name
456         }
457         return t.displayName
458 }
459
460 func (t *Torrent) pieceState(index pieceIndex) (ret PieceState) {
461         p := &t.pieces[index]
462         ret.Priority = t.piecePriority(index)
463         ret.Completion = p.completion()
464         if p.queuedForHash() || p.hashing {
465                 ret.Checking = true
466         }
467         if !ret.Complete && t.piecePartiallyDownloaded(index) {
468                 ret.Partial = true
469         }
470         return
471 }
472
473 func (t *Torrent) metadataPieceSize(piece int) int {
474         return metadataPieceSize(len(t.metadataBytes), piece)
475 }
476
477 func (t *Torrent) newMetadataExtensionMessage(c *connection, msgType int, piece int, data []byte) pp.Message {
478         d := map[string]int{
479                 "msg_type": msgType,
480                 "piece":    piece,
481         }
482         if data != nil {
483                 d["total_size"] = len(t.metadataBytes)
484         }
485         p := bencode.MustMarshal(d)
486         return pp.Message{
487                 Type:            pp.Extended,
488                 ExtendedID:      c.PeerExtensionIDs[pp.ExtensionNameMetadata],
489                 ExtendedPayload: append(p, data...),
490         }
491 }
492
493 func (t *Torrent) pieceStateRuns() (ret []PieceStateRun) {
494         rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
495                 ret = append(ret, PieceStateRun{
496                         PieceState: el.(PieceState),
497                         Length:     int(count),
498                 })
499         })
500         for index := range t.pieces {
501                 rle.Append(t.pieceState(pieceIndex(index)), 1)
502         }
503         rle.Flush()
504         return
505 }
506
507 // Produces a small string representing a PieceStateRun.
508 func pieceStateRunStatusChars(psr PieceStateRun) (ret string) {
509         ret = fmt.Sprintf("%d", psr.Length)
510         ret += func() string {
511                 switch psr.Priority {
512                 case PiecePriorityNext:
513                         return "N"
514                 case PiecePriorityNormal:
515                         return "."
516                 case PiecePriorityReadahead:
517                         return "R"
518                 case PiecePriorityNow:
519                         return "!"
520                 case PiecePriorityHigh:
521                         return "H"
522                 default:
523                         return ""
524                 }
525         }()
526         if psr.Checking {
527                 ret += "H"
528         }
529         if psr.Partial {
530                 ret += "P"
531         }
532         if psr.Complete {
533                 ret += "C"
534         }
535         if !psr.Ok {
536                 ret += "?"
537         }
538         return
539 }
540
541 func (t *Torrent) writeStatus(w io.Writer) {
542         fmt.Fprintf(w, "Infohash: %s\n", t.infoHash.HexString())
543         fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
544         if !t.haveInfo() {
545                 fmt.Fprintf(w, "Metadata have: ")
546                 for _, h := range t.metadataCompletedChunks {
547                         fmt.Fprintf(w, "%c", func() rune {
548                                 if h {
549                                         return 'H'
550                                 } else {
551                                         return '.'
552                                 }
553                         }())
554                 }
555                 fmt.Fprintln(w)
556         }
557         fmt.Fprintf(w, "Piece length: %s\n", func() string {
558                 if t.haveInfo() {
559                         return fmt.Sprint(t.usualPieceSize())
560                 } else {
561                         return "?"
562                 }
563         }())
564         if t.info != nil {
565                 fmt.Fprintf(w, "Num Pieces: %d (%d completed)\n", t.numPieces(), t.numPiecesCompleted())
566                 fmt.Fprint(w, "Piece States:")
567                 for _, psr := range t.pieceStateRuns() {
568                         w.Write([]byte(" "))
569                         w.Write([]byte(pieceStateRunStatusChars(psr)))
570                 }
571                 fmt.Fprintln(w)
572         }
573         fmt.Fprintf(w, "Reader Pieces:")
574         t.forReaderOffsetPieces(func(begin, end pieceIndex) (again bool) {
575                 fmt.Fprintf(w, " %d:%d", begin, end)
576                 return true
577         })
578         fmt.Fprintln(w)
579
580         fmt.Fprintf(w, "Enabled trackers:\n")
581         func() {
582                 tw := tabwriter.NewWriter(w, 0, 0, 2, ' ', 0)
583                 fmt.Fprintf(tw, "    URL\tNext announce\tLast announce\n")
584                 for _, ta := range slices.Sort(slices.FromMapElems(t.trackerAnnouncers), func(l, r *trackerScraper) bool {
585                         return l.u.String() < r.u.String()
586                 }).([]*trackerScraper) {
587                         fmt.Fprintf(tw, "    %s\n", ta.statusLine())
588                 }
589                 tw.Flush()
590         }()
591
592         fmt.Fprintf(w, "DHT Announces: %d\n", t.numDHTAnnounces)
593
594         spew.NewDefaultConfig()
595         spew.Fdump(w, t.statsLocked())
596
597         conns := t.connsAsSlice()
598         slices.Sort(conns, worseConn)
599         for i, c := range conns {
600                 fmt.Fprintf(w, "%2d. ", i+1)
601                 c.WriteStatus(w, t)
602         }
603 }
604
605 func (t *Torrent) haveInfo() bool {
606         return t.info != nil
607 }
608
609 // Returns a run-time generated MetaInfo that includes the info bytes and
610 // announce-list as currently known to the client.
611 func (t *Torrent) newMetaInfo() metainfo.MetaInfo {
612         return metainfo.MetaInfo{
613                 CreationDate: time.Now().Unix(),
614                 Comment:      "dynamic metainfo from client",
615                 CreatedBy:    "go.torrent",
616                 AnnounceList: t.metainfo.UpvertedAnnounceList(),
617                 InfoBytes: func() []byte {
618                         if t.haveInfo() {
619                                 return t.metadataBytes
620                         } else {
621                                 return nil
622                         }
623                 }(),
624         }
625 }
626
627 func (t *Torrent) BytesMissing() int64 {
628         t.cl.rLock()
629         defer t.cl.rUnlock()
630         return t.bytesMissingLocked()
631 }
632
633 func (t *Torrent) bytesMissingLocked() int64 {
634         return t.bytesLeft()
635 }
636
637 func (t *Torrent) bytesLeft() (left int64) {
638         bitmap.Flip(t.completedPieces, 0, bitmap.BitIndex(t.numPieces())).IterTyped(func(piece int) bool {
639                 p := &t.pieces[piece]
640                 left += int64(p.length() - p.numDirtyBytes())
641                 return true
642         })
643         return
644 }
645
646 // Bytes left to give in tracker announces.
647 func (t *Torrent) bytesLeftAnnounce() uint64 {
648         if t.haveInfo() {
649                 return uint64(t.bytesLeft())
650         } else {
651                 return math.MaxUint64
652         }
653 }
654
655 func (t *Torrent) piecePartiallyDownloaded(piece pieceIndex) bool {
656         if t.pieceComplete(piece) {
657                 return false
658         }
659         if t.pieceAllDirty(piece) {
660                 return false
661         }
662         return t.pieces[piece].hasDirtyChunks()
663 }
664
665 func (t *Torrent) usualPieceSize() int {
666         return int(t.info.PieceLength)
667 }
668
669 func (t *Torrent) numPieces() pieceIndex {
670         return pieceIndex(t.info.NumPieces())
671 }
672
673 func (t *Torrent) numPiecesCompleted() (num int) {
674         return t.completedPieces.Len()
675 }
676
677 func (t *Torrent) close() (err error) {
678         t.closed.Set()
679         t.tickleReaders()
680         if t.storage != nil {
681                 t.storageLock.Lock()
682                 t.storage.Close()
683                 t.storageLock.Unlock()
684         }
685         for conn := range t.conns {
686                 conn.Close()
687         }
688         t.cl.event.Broadcast()
689         t.pieceStateChanges.Close()
690         t.updateWantPeersEvent()
691         return
692 }
693
694 func (t *Torrent) requestOffset(r request) int64 {
695         return torrentRequestOffset(*t.length, int64(t.usualPieceSize()), r)
696 }
697
698 // Return the request that would include the given offset into the torrent
699 // data. Returns !ok if there is no such request.
700 func (t *Torrent) offsetRequest(off int64) (req request, ok bool) {
701         return torrentOffsetRequest(*t.length, t.info.PieceLength, int64(t.chunkSize), off)
702 }
703
704 func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
705         defer perf.ScopeTimerErr(&err)()
706         n, err := t.pieces[piece].Storage().WriteAt(data, begin)
707         if err == nil && n != len(data) {
708                 err = io.ErrShortWrite
709         }
710         return
711 }
712
713 func (t *Torrent) bitfield() (bf []bool) {
714         bf = make([]bool, t.numPieces())
715         t.completedPieces.IterTyped(func(piece int) (again bool) {
716                 bf[piece] = true
717                 return true
718         })
719         return
720 }
721
722 func (t *Torrent) pieceNumChunks(piece pieceIndex) pp.Integer {
723         return (t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize
724 }
725
726 func (t *Torrent) pendAllChunkSpecs(pieceIndex pieceIndex) {
727         t.pieces[pieceIndex].dirtyChunks.Clear()
728 }
729
730 func (t *Torrent) pieceLength(piece pieceIndex) pp.Integer {
731         if t.info.PieceLength == 0 {
732                 // There will be no variance amongst pieces. Only pain.
733                 return 0
734         }
735         if piece == t.numPieces()-1 {
736                 ret := pp.Integer(*t.length % t.info.PieceLength)
737                 if ret != 0 {
738                         return ret
739                 }
740         }
741         return pp.Integer(t.info.PieceLength)
742 }
743
744 func (t *Torrent) hashPiece(piece pieceIndex) (ret metainfo.Hash) {
745         hash := pieceHash.New()
746         p := &t.pieces[piece]
747         p.waitNoPendingWrites()
748         ip := t.info.Piece(int(piece))
749         pl := ip.Length()
750         n, err := io.Copy(hash, io.NewSectionReader(t.pieces[piece].Storage(), 0, pl))
751         if n == pl {
752                 missinggo.CopyExact(&ret, hash.Sum(nil))
753                 return
754         }
755         if err != io.ErrUnexpectedEOF && !os.IsNotExist(err) {
756                 log.Printf("unexpected error hashing piece with %T: %s", t.storage.TorrentImpl, err)
757         }
758         return
759 }
760
761 func (t *Torrent) haveAnyPieces() bool {
762         return t.completedPieces.Len() != 0
763 }
764
765 func (t *Torrent) haveAllPieces() bool {
766         if !t.haveInfo() {
767                 return false
768         }
769         return t.completedPieces.Len() == bitmap.BitIndex(t.numPieces())
770 }
771
772 func (t *Torrent) havePiece(index pieceIndex) bool {
773         return t.haveInfo() && t.pieceComplete(index)
774 }
775
776 func (t *Torrent) haveChunk(r request) (ret bool) {
777         // defer func() {
778         //      log.Println("have chunk", r, ret)
779         // }()
780         if !t.haveInfo() {
781                 return false
782         }
783         if t.pieceComplete(pieceIndex(r.Index)) {
784                 return true
785         }
786         p := &t.pieces[r.Index]
787         return !p.pendingChunk(r.chunkSpec, t.chunkSize)
788 }
789
790 func chunkIndex(cs chunkSpec, chunkSize pp.Integer) int {
791         return int(cs.Begin / chunkSize)
792 }
793
794 func (t *Torrent) wantPiece(r request) bool {
795         if !t.wantPieceIndex(pieceIndex(r.Index)) {
796                 return false
797         }
798         if t.pieces[r.Index].pendingChunk(r.chunkSpec, t.chunkSize) {
799                 return true
800         }
801         // TODO: What about pieces that were wanted, but aren't now, and aren't
802         // completed either? That used to be done here.
803         return false
804 }
805
806 func (t *Torrent) wantPieceIndex(index pieceIndex) bool {
807         if !t.haveInfo() {
808                 return false
809         }
810         if index < 0 || index >= t.numPieces() {
811                 return false
812         }
813         p := &t.pieces[index]
814         if p.queuedForHash() {
815                 return false
816         }
817         if p.hashing {
818                 return false
819         }
820         if t.pieceComplete(index) {
821                 return false
822         }
823         if t.pendingPieces.Contains(bitmap.BitIndex(index)) {
824                 return true
825         }
826         // log.Printf("piece %d not pending", index)
827         return !t.forReaderOffsetPieces(func(begin, end pieceIndex) bool {
828                 return index < begin || index >= end
829         })
830 }
831
832 // The worst connection is one that hasn't been sent, or sent anything useful
833 // for the longest. A bad connection is one that usually sends us unwanted
834 // pieces, or has been in worser half of the established connections for more
835 // than a minute.
836 func (t *Torrent) worstBadConn() *connection {
837         wcs := worseConnSlice{t.unclosedConnsAsSlice()}
838         heap.Init(&wcs)
839         for wcs.Len() != 0 {
840                 c := heap.Pop(&wcs).(*connection)
841                 if c.stats.ChunksReadWasted.Int64() >= 6 && c.stats.ChunksReadWasted.Int64() > c.stats.ChunksReadUseful.Int64() {
842                         return c
843                 }
844                 // If the connection is in the worst half of the established
845                 // connection quota and is older than a minute.
846                 if wcs.Len() >= (t.maxEstablishedConns+1)/2 {
847                         // Give connections 1 minute to prove themselves.
848                         if time.Since(c.completedHandshake) > time.Minute {
849                                 return c
850                         }
851                 }
852         }
853         return nil
854 }
855
856 type PieceStateChange struct {
857         Index int
858         PieceState
859 }
860
861 func (t *Torrent) publishPieceChange(piece pieceIndex) {
862         cur := t.pieceState(piece)
863         p := &t.pieces[piece]
864         if cur != p.publicPieceState {
865                 p.publicPieceState = cur
866                 t.pieceStateChanges.Publish(PieceStateChange{
867                         int(piece),
868                         cur,
869                 })
870         }
871 }
872
873 func (t *Torrent) pieceNumPendingChunks(piece pieceIndex) pp.Integer {
874         if t.pieceComplete(piece) {
875                 return 0
876         }
877         return t.pieceNumChunks(piece) - t.pieces[piece].numDirtyChunks()
878 }
879
880 func (t *Torrent) pieceAllDirty(piece pieceIndex) bool {
881         return t.pieces[piece].dirtyChunks.Len() == int(t.pieceNumChunks(piece))
882 }
883
884 func (t *Torrent) readersChanged() {
885         t.updateReaderPieces()
886         t.updateAllPiecePriorities()
887 }
888
889 func (t *Torrent) updateReaderPieces() {
890         t.readerNowPieces, t.readerReadaheadPieces = t.readerPiecePriorities()
891 }
892
893 func (t *Torrent) readerPosChanged(from, to pieceRange) {
894         if from == to {
895                 return
896         }
897         t.updateReaderPieces()
898         // Order the ranges, high and low.
899         l, h := from, to
900         if l.begin > h.begin {
901                 l, h = h, l
902         }
903         if l.end < h.begin {
904                 // Two distinct ranges.
905                 t.updatePiecePriorities(l.begin, l.end)
906                 t.updatePiecePriorities(h.begin, h.end)
907         } else {
908                 // Ranges overlap.
909                 end := l.end
910                 if h.end > end {
911                         end = h.end
912                 }
913                 t.updatePiecePriorities(l.begin, end)
914         }
915 }
916
917 func (t *Torrent) maybeNewConns() {
918         // Tickle the accept routine.
919         t.cl.event.Broadcast()
920         t.openNewConns()
921 }
922
923 func (t *Torrent) piecePriorityChanged(piece pieceIndex) {
924         // log.Printf("piece %d priority changed", piece)
925         for c := range t.conns {
926                 if c.updatePiecePriority(piece) {
927                         // log.Print("conn piece priority changed")
928                         c.updateRequests()
929                 }
930         }
931         t.maybeNewConns()
932         t.publishPieceChange(piece)
933 }
934
935 func (t *Torrent) updatePiecePriority(piece pieceIndex) {
936         p := &t.pieces[piece]
937         newPrio := p.uncachedPriority()
938         // log.Printf("torrent %p: piece %d: uncached priority: %v", t, piece, newPrio)
939         if newPrio == PiecePriorityNone {
940                 if !t.pendingPieces.Remove(bitmap.BitIndex(piece)) {
941                         return
942                 }
943         } else {
944                 if !t.pendingPieces.Set(bitmap.BitIndex(piece), newPrio.BitmapPriority()) {
945                         return
946                 }
947         }
948         t.piecePriorityChanged(piece)
949 }
950
951 func (t *Torrent) updateAllPiecePriorities() {
952         t.updatePiecePriorities(0, t.numPieces())
953 }
954
955 // Update all piece priorities in one hit. This function should have the same
956 // output as updatePiecePriority, but across all pieces.
957 func (t *Torrent) updatePiecePriorities(begin, end pieceIndex) {
958         for i := begin; i < end; i++ {
959                 t.updatePiecePriority(i)
960         }
961 }
962
963 // Returns the range of pieces [begin, end) that contains the extent of bytes.
964 func (t *Torrent) byteRegionPieces(off, size int64) (begin, end pieceIndex) {
965         if off >= *t.length {
966                 return
967         }
968         if off < 0 {
969                 size += off
970                 off = 0
971         }
972         if size <= 0 {
973                 return
974         }
975         begin = pieceIndex(off / t.info.PieceLength)
976         end = pieceIndex((off + size + t.info.PieceLength - 1) / t.info.PieceLength)
977         if end > pieceIndex(t.info.NumPieces()) {
978                 end = pieceIndex(t.info.NumPieces())
979         }
980         return
981 }
982
983 // Returns true if all iterations complete without breaking. Returns the read
984 // regions for all readers. The reader regions should not be merged as some
985 // callers depend on this method to enumerate readers.
986 func (t *Torrent) forReaderOffsetPieces(f func(begin, end pieceIndex) (more bool)) (all bool) {
987         for r := range t.readers {
988                 p := r.pieces
989                 if p.begin >= p.end {
990                         continue
991                 }
992                 if !f(p.begin, p.end) {
993                         return false
994                 }
995         }
996         return true
997 }
998
999 func (t *Torrent) piecePriority(piece pieceIndex) piecePriority {
1000         prio, ok := t.pendingPieces.GetPriority(bitmap.BitIndex(piece))
1001         if !ok {
1002                 return PiecePriorityNone
1003         }
1004         if prio > 0 {
1005                 panic(prio)
1006         }
1007         ret := piecePriority(-prio)
1008         if ret == PiecePriorityNone {
1009                 panic(piece)
1010         }
1011         return ret
1012 }
1013
1014 func (t *Torrent) pendRequest(req request) {
1015         ci := chunkIndex(req.chunkSpec, t.chunkSize)
1016         t.pieces[req.Index].pendChunkIndex(ci)
1017 }
1018
1019 func (t *Torrent) pieceCompletionChanged(piece pieceIndex) {
1020         log.Call().Add("piece", piece).AddValue(debugLogValue).Log(t.logger)
1021         t.cl.event.Broadcast()
1022         if t.pieceComplete(piece) {
1023                 t.onPieceCompleted(piece)
1024         } else {
1025                 t.onIncompletePiece(piece)
1026         }
1027         t.updatePiecePriority(piece)
1028 }
1029
1030 func (t *Torrent) numReceivedConns() (ret int) {
1031         for c := range t.conns {
1032                 if c.Discovery == peerSourceIncoming {
1033                         ret++
1034                 }
1035         }
1036         return
1037 }
1038
1039 func (t *Torrent) maxHalfOpen() int {
1040         // Note that if we somehow exceed the maximum established conns, we want
1041         // the negative value to have an effect.
1042         establishedHeadroom := int64(t.maxEstablishedConns - len(t.conns))
1043         extraIncoming := int64(t.numReceivedConns() - t.maxEstablishedConns/2)
1044         // We want to allow some experimentation with new peers, and to try to
1045         // upset an oversupply of received connections.
1046         return int(min(max(5, extraIncoming)+establishedHeadroom, int64(t.cl.config.HalfOpenConnsPerTorrent)))
1047 }
1048
1049 func (t *Torrent) openNewConns() {
1050         defer t.updateWantPeersEvent()
1051         for t.peers.Len() != 0 {
1052                 if !t.wantConns() {
1053                         return
1054                 }
1055                 if len(t.halfOpen) >= t.maxHalfOpen() {
1056                         return
1057                 }
1058                 p := t.peers.PopMax()
1059                 t.initiateConn(p)
1060         }
1061 }
1062
1063 func (t *Torrent) getConnPieceInclination() []int {
1064         _ret := t.connPieceInclinationPool.Get()
1065         if _ret == nil {
1066                 pieceInclinationsNew.Add(1)
1067                 return rand.Perm(int(t.numPieces()))
1068         }
1069         pieceInclinationsReused.Add(1)
1070         return *_ret.(*[]int)
1071 }
1072
1073 func (t *Torrent) putPieceInclination(pi []int) {
1074         t.connPieceInclinationPool.Put(&pi)
1075         pieceInclinationsPut.Add(1)
1076 }
1077
1078 func (t *Torrent) updatePieceCompletion(piece pieceIndex) {
1079         pcu := t.pieceCompleteUncached(piece)
1080         p := &t.pieces[piece]
1081         changed := t.completedPieces.Get(bitmap.BitIndex(piece)) != pcu.Complete || p.storageCompletionOk != pcu.Ok
1082         log.Fmsg("piece %d completion: %v", piece, pcu.Ok).AddValue(debugLogValue).Log(t.logger)
1083         p.storageCompletionOk = pcu.Ok
1084         t.completedPieces.Set(bitmap.BitIndex(piece), pcu.Complete)
1085         t.tickleReaders()
1086         // log.Printf("piece %d uncached completion: %v", piece, pcu.Complete)
1087         // log.Printf("piece %d changed: %v", piece, changed)
1088         if changed {
1089                 t.pieceCompletionChanged(piece)
1090         }
1091 }
1092
1093 // Non-blocking read. Client lock is not required.
1094 func (t *Torrent) readAt(b []byte, off int64) (n int, err error) {
1095         p := &t.pieces[off/t.info.PieceLength]
1096         p.waitNoPendingWrites()
1097         return p.Storage().ReadAt(b, off-p.Info().Offset())
1098 }
1099
1100 func (t *Torrent) updateAllPieceCompletions() {
1101         for i := pieceIndex(0); i < t.numPieces(); i++ {
1102                 t.updatePieceCompletion(i)
1103         }
1104 }
1105
1106 // Returns an error if the metadata was completed, but couldn't be set for
1107 // some reason. Blame it on the last peer to contribute.
1108 func (t *Torrent) maybeCompleteMetadata() error {
1109         if t.haveInfo() {
1110                 // Nothing to do.
1111                 return nil
1112         }
1113         if !t.haveAllMetadataPieces() {
1114                 // Don't have enough metadata pieces.
1115                 return nil
1116         }
1117         err := t.setInfoBytes(t.metadataBytes)
1118         if err != nil {
1119                 t.invalidateMetadata()
1120                 return fmt.Errorf("error setting info bytes: %s", err)
1121         }
1122         if t.cl.config.Debug {
1123                 log.Printf("%s: got metadata from peers", t)
1124         }
1125         return nil
1126 }
1127
1128 func (t *Torrent) readerPieces() (ret bitmap.Bitmap) {
1129         t.forReaderOffsetPieces(func(begin, end pieceIndex) bool {
1130                 ret.AddRange(bitmap.BitIndex(begin), bitmap.BitIndex(end))
1131                 return true
1132         })
1133         return
1134 }
1135
1136 func (t *Torrent) readerPiecePriorities() (now, readahead bitmap.Bitmap) {
1137         t.forReaderOffsetPieces(func(begin, end pieceIndex) bool {
1138                 if end > begin {
1139                         now.Add(bitmap.BitIndex(begin))
1140                         readahead.AddRange(bitmap.BitIndex(begin)+1, bitmap.BitIndex(end))
1141                 }
1142                 return true
1143         })
1144         return
1145 }
1146
1147 func (t *Torrent) needData() bool {
1148         if t.closed.IsSet() {
1149                 return false
1150         }
1151         if !t.haveInfo() {
1152                 return true
1153         }
1154         return t.pendingPieces.Len() != 0
1155 }
1156
1157 func appendMissingStrings(old, new []string) (ret []string) {
1158         ret = old
1159 new:
1160         for _, n := range new {
1161                 for _, o := range old {
1162                         if o == n {
1163                                 continue new
1164                         }
1165                 }
1166                 ret = append(ret, n)
1167         }
1168         return
1169 }
1170
1171 func appendMissingTrackerTiers(existing [][]string, minNumTiers int) (ret [][]string) {
1172         ret = existing
1173         for minNumTiers > len(ret) {
1174                 ret = append(ret, nil)
1175         }
1176         return
1177 }
1178
1179 func (t *Torrent) addTrackers(announceList [][]string) {
1180         fullAnnounceList := &t.metainfo.AnnounceList
1181         t.metainfo.AnnounceList = appendMissingTrackerTiers(*fullAnnounceList, len(announceList))
1182         for tierIndex, trackerURLs := range announceList {
1183                 (*fullAnnounceList)[tierIndex] = appendMissingStrings((*fullAnnounceList)[tierIndex], trackerURLs)
1184         }
1185         t.startMissingTrackerScrapers()
1186         t.updateWantPeersEvent()
1187 }
1188
1189 // Don't call this before the info is available.
1190 func (t *Torrent) bytesCompleted() int64 {
1191         if !t.haveInfo() {
1192                 return 0
1193         }
1194         return t.info.TotalLength() - t.bytesLeft()
1195 }
1196
1197 func (t *Torrent) SetInfoBytes(b []byte) (err error) {
1198         t.cl.lock()
1199         defer t.cl.unlock()
1200         return t.setInfoBytes(b)
1201 }
1202
1203 // Returns true if connection is removed from torrent.Conns.
1204 func (t *Torrent) deleteConnection(c *connection) (ret bool) {
1205         if !c.closed.IsSet() {
1206                 panic("connection is not closed")
1207                 // There are behaviours prevented by the closed state that will fail
1208                 // if the connection has been deleted.
1209         }
1210         _, ret = t.conns[c]
1211         delete(t.conns, c)
1212         torrent.Add("deleted connections", 1)
1213         c.deleteAllRequests()
1214         if len(t.conns) == 0 {
1215                 t.assertNoPendingRequests()
1216         }
1217         return
1218 }
1219
1220 func (t *Torrent) assertNoPendingRequests() {
1221         if len(t.pendingRequests) != 0 {
1222                 panic(t.pendingRequests)
1223         }
1224         if len(t.lastRequested) != 0 {
1225                 panic(t.lastRequested)
1226         }
1227 }
1228
1229 func (t *Torrent) dropConnection(c *connection) {
1230         t.cl.event.Broadcast()
1231         c.Close()
1232         if t.deleteConnection(c) {
1233                 t.openNewConns()
1234         }
1235 }
1236
1237 func (t *Torrent) wantPeers() bool {
1238         if t.closed.IsSet() {
1239                 return false
1240         }
1241         if t.peers.Len() > t.cl.config.TorrentPeersLowWater {
1242                 return false
1243         }
1244         return t.needData() || t.seeding()
1245 }
1246
1247 func (t *Torrent) updateWantPeersEvent() {
1248         if t.wantPeers() {
1249                 t.wantPeersEvent.Set()
1250         } else {
1251                 t.wantPeersEvent.Clear()
1252         }
1253 }
1254
1255 // Returns whether the client should make effort to seed the torrent.
1256 func (t *Torrent) seeding() bool {
1257         cl := t.cl
1258         if t.closed.IsSet() {
1259                 return false
1260         }
1261         if cl.config.NoUpload {
1262                 return false
1263         }
1264         if !cl.config.Seed {
1265                 return false
1266         }
1267         if cl.config.DisableAggressiveUpload && t.needData() {
1268                 return false
1269         }
1270         return true
1271 }
1272
1273 func (t *Torrent) startScrapingTracker(_url string) {
1274         if _url == "" {
1275                 return
1276         }
1277         u, err := url.Parse(_url)
1278         if err != nil {
1279                 log.Str("error parsing tracker url").AddValues("url", _url).Log(t.logger)
1280                 // TODO: Handle urls with leading '*', some kind of silly uTorrent
1281                 // convention?
1282                 return
1283         }
1284         if u.Scheme == "udp" {
1285                 u.Scheme = "udp4"
1286                 t.startScrapingTracker(u.String())
1287                 u.Scheme = "udp6"
1288                 t.startScrapingTracker(u.String())
1289                 return
1290         }
1291         if u.Scheme == "udp4" && (t.cl.config.DisableIPv4Peers || t.cl.config.DisableIPv4) {
1292                 return
1293         }
1294         if u.Scheme == "udp6" && t.cl.config.DisableIPv6 {
1295                 return
1296         }
1297         if _, ok := t.trackerAnnouncers[_url]; ok {
1298                 return
1299         }
1300         newAnnouncer := &trackerScraper{
1301                 u: *u,
1302                 t: t,
1303         }
1304         if t.trackerAnnouncers == nil {
1305                 t.trackerAnnouncers = make(map[string]*trackerScraper)
1306         }
1307         t.trackerAnnouncers[_url] = newAnnouncer
1308         go newAnnouncer.Run()
1309 }
1310
1311 // Adds and starts tracker scrapers for tracker URLs that aren't already
1312 // running.
1313 func (t *Torrent) startMissingTrackerScrapers() {
1314         if t.cl.config.DisableTrackers {
1315                 return
1316         }
1317         t.startScrapingTracker(t.metainfo.Announce)
1318         for _, tier := range t.metainfo.AnnounceList {
1319                 for _, url := range tier {
1320                         t.startScrapingTracker(url)
1321                 }
1322         }
1323 }
1324
1325 // Returns an AnnounceRequest with fields filled out to defaults and current
1326 // values.
1327 func (t *Torrent) announceRequest() tracker.AnnounceRequest {
1328         // Note that IPAddress is not set. It's set for UDP inside the tracker
1329         // code, since it's dependent on the network in use.
1330         return tracker.AnnounceRequest{
1331                 Event:    tracker.None,
1332                 NumWant:  -1,
1333                 Port:     uint16(t.cl.incomingPeerPort()),
1334                 PeerId:   t.cl.peerID,
1335                 InfoHash: t.infoHash,
1336                 Key:      t.cl.announceKey(),
1337
1338                 // The following are vaguely described in BEP 3.
1339
1340                 Left:     t.bytesLeftAnnounce(),
1341                 Uploaded: t.stats.BytesWrittenData.Int64(),
1342                 // There's no mention of wasted or unwanted download in the BEP.
1343                 Downloaded: t.stats.BytesReadUsefulData.Int64(),
1344         }
1345 }
1346
1347 // Adds peers revealed in an announce until the announce ends, or we have
1348 // enough peers.
1349 func (t *Torrent) consumeDHTAnnounce(pvs <-chan dht.PeersValues) {
1350         cl := t.cl
1351         // Count all the unique addresses we got during this announce.
1352         allAddrs := make(map[string]struct{})
1353         for {
1354                 select {
1355                 case v, ok := <-pvs:
1356                         if !ok {
1357                                 return
1358                         }
1359                         addPeers := make([]Peer, 0, len(v.Peers))
1360                         for _, cp := range v.Peers {
1361                                 if cp.Port == 0 {
1362                                         // Can't do anything with this.
1363                                         continue
1364                                 }
1365                                 addPeers = append(addPeers, Peer{
1366                                         IP:     cp.IP[:],
1367                                         Port:   cp.Port,
1368                                         Source: peerSourceDHTGetPeers,
1369                                 })
1370                                 key := (&net.UDPAddr{
1371                                         IP:   cp.IP[:],
1372                                         Port: cp.Port,
1373                                 }).String()
1374                                 allAddrs[key] = struct{}{}
1375                         }
1376                         cl.lock()
1377                         t.addPeers(addPeers)
1378                         numPeers := t.peers.Len()
1379                         cl.unlock()
1380                         if numPeers >= cl.config.TorrentPeersHighWater {
1381                                 return
1382                         }
1383                 case <-t.closed.LockedChan(cl.locker()):
1384                         return
1385                 }
1386         }
1387 }
1388
1389 func (t *Torrent) announceDHT(impliedPort bool, s *dht.Server) (err error) {
1390         cl := t.cl
1391         ps, err := s.Announce(t.infoHash, cl.incomingPeerPort(), impliedPort)
1392         if err != nil {
1393                 return
1394         }
1395         t.consumeDHTAnnounce(ps.Peers)
1396         ps.Close()
1397         return
1398 }
1399
1400 func (t *Torrent) dhtAnnouncer(s *dht.Server) {
1401         cl := t.cl
1402         for {
1403                 select {
1404                 case <-t.wantPeersEvent.LockedChan(cl.locker()):
1405                 case <-t.closed.LockedChan(cl.locker()):
1406                         return
1407                 }
1408                 err := t.announceDHT(true, s)
1409                 func() {
1410                         cl.lock()
1411                         defer cl.unlock()
1412                         if err == nil {
1413                                 t.numDHTAnnounces++
1414                         } else {
1415                                 log.Printf("error announcing %q to DHT: %s", t, err)
1416                         }
1417                 }()
1418                 select {
1419                 case <-t.closed.LockedChan(cl.locker()):
1420                         return
1421                 case <-time.After(5 * time.Minute):
1422                 }
1423         }
1424 }
1425
1426 func (t *Torrent) addPeers(peers []Peer) {
1427         for _, p := range peers {
1428                 t.addPeer(p)
1429         }
1430 }
1431
1432 func (t *Torrent) Stats() TorrentStats {
1433         t.cl.rLock()
1434         defer t.cl.rUnlock()
1435         return t.statsLocked()
1436 }
1437
1438 func (t *Torrent) statsLocked() (ret TorrentStats) {
1439         ret.ActivePeers = len(t.conns)
1440         ret.HalfOpenPeers = len(t.halfOpen)
1441         ret.PendingPeers = t.peers.Len()
1442         ret.TotalPeers = t.numTotalPeers()
1443         ret.ConnectedSeeders = 0
1444         for c := range t.conns {
1445                 if all, ok := c.peerHasAllPieces(); all && ok {
1446                         ret.ConnectedSeeders++
1447                 }
1448         }
1449         ret.ConnStats = t.stats.Copy()
1450         return
1451 }
1452
1453 // The total number of peers in the torrent.
1454 func (t *Torrent) numTotalPeers() int {
1455         peers := make(map[string]struct{})
1456         for conn := range t.conns {
1457                 ra := conn.conn.RemoteAddr()
1458                 if ra == nil {
1459                         // It's been closed and doesn't support RemoteAddr.
1460                         continue
1461                 }
1462                 peers[ra.String()] = struct{}{}
1463         }
1464         for addr := range t.halfOpen {
1465                 peers[addr] = struct{}{}
1466         }
1467         t.peers.Each(func(peer Peer) {
1468                 peers[fmt.Sprintf("%s:%d", peer.IP, peer.Port)] = struct{}{}
1469         })
1470         return len(peers)
1471 }
1472
1473 // Reconcile bytes transferred before connection was associated with a
1474 // torrent.
1475 func (t *Torrent) reconcileHandshakeStats(c *connection) {
1476         if c.stats != (ConnStats{
1477                 // Handshakes should only increment these fields:
1478                 BytesWritten: c.stats.BytesWritten,
1479                 BytesRead:    c.stats.BytesRead,
1480         }) {
1481                 panic("bad stats")
1482         }
1483         c.postHandshakeStats(func(cs *ConnStats) {
1484                 cs.BytesRead.Add(c.stats.BytesRead.Int64())
1485                 cs.BytesWritten.Add(c.stats.BytesWritten.Int64())
1486         })
1487         c.reconciledHandshakeStats = true
1488 }
1489
1490 // Returns true if the connection is added.
1491 func (t *Torrent) addConnection(c *connection) (err error) {
1492         defer func() {
1493                 if err == nil {
1494                         torrent.Add("added connections", 1)
1495                 }
1496         }()
1497         if t.closed.IsSet() {
1498                 return errors.New("torrent closed")
1499         }
1500         for c0 := range t.conns {
1501                 if c.PeerID != c0.PeerID {
1502                         continue
1503                 }
1504                 if !t.cl.config.dropDuplicatePeerIds {
1505                         continue
1506                 }
1507                 if left, ok := c.hasPreferredNetworkOver(c0); ok && left {
1508                         c0.Close()
1509                         t.deleteConnection(c0)
1510                 } else {
1511                         return errors.New("existing connection preferred")
1512                 }
1513         }
1514         if len(t.conns) >= t.maxEstablishedConns {
1515                 c := t.worstBadConn()
1516                 if c == nil {
1517                         return errors.New("don't want conns")
1518                 }
1519                 c.Close()
1520                 t.deleteConnection(c)
1521         }
1522         if len(t.conns) >= t.maxEstablishedConns {
1523                 panic(len(t.conns))
1524         }
1525         t.conns[c] = struct{}{}
1526         return nil
1527 }
1528
1529 func (t *Torrent) wantConns() bool {
1530         if !t.networkingEnabled {
1531                 return false
1532         }
1533         if t.closed.IsSet() {
1534                 return false
1535         }
1536         if !t.seeding() && !t.needData() {
1537                 return false
1538         }
1539         if len(t.conns) < t.maxEstablishedConns {
1540                 return true
1541         }
1542         return t.worstBadConn() != nil
1543 }
1544
1545 func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
1546         t.cl.lock()
1547         defer t.cl.unlock()
1548         oldMax = t.maxEstablishedConns
1549         t.maxEstablishedConns = max
1550         wcs := slices.HeapInterface(slices.FromMapKeys(t.conns), worseConn)
1551         for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 {
1552                 t.dropConnection(wcs.Pop().(*connection))
1553         }
1554         t.openNewConns()
1555         return oldMax
1556 }
1557
1558 func (t *Torrent) pieceHashed(piece pieceIndex, correct bool) {
1559         log.Fmsg("hashed piece %d", piece).Add("piece", piece).Add("passed", correct).AddValue(debugLogValue).Log(t.logger)
1560         if t.closed.IsSet() {
1561                 return
1562         }
1563         p := &t.pieces[piece]
1564         touchers := t.reapPieceTouchers(piece)
1565         if p.storageCompletionOk {
1566                 // Don't score the first time a piece is hashed, it could be an
1567                 // initial check.
1568                 if correct {
1569                         pieceHashedCorrect.Add(1)
1570                 } else {
1571                         log.Printf("%s: piece %d (%s) failed hash: %d connections contributed", t, piece, p.hash, len(touchers))
1572                         pieceHashedNotCorrect.Add(1)
1573                 }
1574         }
1575         if correct {
1576                 if len(touchers) != 0 {
1577                         // Don't increment stats above connection-level for every involved
1578                         // connection.
1579                         t.allStats((*ConnStats).incrementPiecesDirtiedGood)
1580                 }
1581                 for _, c := range touchers {
1582                         c.stats.incrementPiecesDirtiedGood()
1583                 }
1584                 err := p.Storage().MarkComplete()
1585                 if err != nil {
1586                         log.Printf("%T: error marking piece complete %d: %s", t.storage, piece, err)
1587                 }
1588         } else {
1589                 if len(touchers) != 0 {
1590                         // Don't increment stats above connection-level for every involved
1591                         // connection.
1592                         t.allStats((*ConnStats).incrementPiecesDirtiedBad)
1593                         for _, c := range touchers {
1594                                 // Y u do dis peer?!
1595                                 c.stats.incrementPiecesDirtiedBad()
1596                         }
1597                         slices.Sort(touchers, connLessTrusted)
1598                         if t.cl.config.Debug {
1599                                 log.Printf("dropping first corresponding conn from trust: %v", func() (ret []int64) {
1600                                         for _, c := range touchers {
1601                                                 ret = append(ret, c.netGoodPiecesDirtied())
1602                                         }
1603                                         return
1604                                 }())
1605                         }
1606                         c := touchers[0]
1607                         t.cl.banPeerIP(c.remoteAddr.IP)
1608                         c.Drop()
1609                 }
1610                 t.onIncompletePiece(piece)
1611                 p.Storage().MarkNotComplete()
1612         }
1613         t.updatePieceCompletion(piece)
1614 }
1615
1616 func (t *Torrent) cancelRequestsForPiece(piece pieceIndex) {
1617         // TODO: Make faster
1618         for cn := range t.conns {
1619                 cn.tickleWriter()
1620         }
1621 }
1622
1623 func (t *Torrent) onPieceCompleted(piece pieceIndex) {
1624         t.pendAllChunkSpecs(piece)
1625         t.cancelRequestsForPiece(piece)
1626         for conn := range t.conns {
1627                 conn.Have(piece)
1628         }
1629 }
1630
1631 // Called when a piece is found to be not complete.
1632 func (t *Torrent) onIncompletePiece(piece pieceIndex) {
1633         if t.pieceAllDirty(piece) {
1634                 t.pendAllChunkSpecs(piece)
1635         }
1636         if !t.wantPieceIndex(piece) {
1637                 // log.Printf("piece %d incomplete and unwanted", piece)
1638                 return
1639         }
1640         // We could drop any connections that we told we have a piece that we
1641         // don't here. But there's a test failure, and it seems clients don't care
1642         // if you request pieces that you already claim to have. Pruning bad
1643         // connections might just remove any connections that aren't treating us
1644         // favourably anyway.
1645
1646         // for c := range t.conns {
1647         //      if c.sentHave(piece) {
1648         //              c.Drop()
1649         //      }
1650         // }
1651         for conn := range t.conns {
1652                 if conn.PeerHasPiece(piece) {
1653                         conn.updateRequests()
1654                 }
1655         }
1656 }
1657
1658 func (t *Torrent) verifyPiece(piece pieceIndex) {
1659         cl := t.cl
1660         cl.lock()
1661         defer cl.unlock()
1662         p := &t.pieces[piece]
1663         defer func() {
1664                 p.numVerifies++
1665                 cl.event.Broadcast()
1666         }()
1667         for p.hashing || t.storage == nil {
1668                 cl.event.Wait()
1669         }
1670         if !p.t.piecesQueuedForHash.Remove(bitmap.BitIndex(piece)) {
1671                 panic("piece was not queued")
1672         }
1673         t.updatePiecePriority(piece)
1674         if t.closed.IsSet() || t.pieceComplete(piece) {
1675                 return
1676         }
1677         p.hashing = true
1678         t.publishPieceChange(piece)
1679         t.updatePiecePriority(piece)
1680         t.storageLock.RLock()
1681         cl.unlock()
1682         sum := t.hashPiece(piece)
1683         t.storageLock.RUnlock()
1684         cl.lock()
1685         p.hashing = false
1686         t.updatePiecePriority(piece)
1687         t.pieceHashed(piece, sum == p.hash)
1688         t.publishPieceChange(piece)
1689 }
1690
1691 // Return the connections that touched a piece, and clear the entries while
1692 // doing it.
1693 func (t *Torrent) reapPieceTouchers(piece pieceIndex) (ret []*connection) {
1694         for c := range t.pieces[piece].dirtiers {
1695                 delete(c.peerTouchedPieces, piece)
1696                 ret = append(ret, c)
1697         }
1698         t.pieces[piece].dirtiers = nil
1699         return
1700 }
1701
1702 func (t *Torrent) connsAsSlice() (ret []*connection) {
1703         for c := range t.conns {
1704                 ret = append(ret, c)
1705         }
1706         return
1707 }
1708
1709 // Currently doesn't really queue, but should in the future.
1710 func (t *Torrent) queuePieceCheck(pieceIndex pieceIndex) {
1711         piece := &t.pieces[pieceIndex]
1712         if piece.queuedForHash() {
1713                 return
1714         }
1715         t.piecesQueuedForHash.Add(bitmap.BitIndex(pieceIndex))
1716         t.publishPieceChange(pieceIndex)
1717         t.updatePiecePriority(pieceIndex)
1718         go t.verifyPiece(pieceIndex)
1719 }
1720
1721 func (t *Torrent) VerifyData() {
1722         for i := pieceIndex(0); i < t.NumPieces(); i++ {
1723                 t.Piece(i).VerifyData()
1724         }
1725 }
1726
1727 // Start the process of connecting to the given peer for the given torrent if
1728 // appropriate.
1729 func (t *Torrent) initiateConn(peer Peer) {
1730         if peer.Id == t.cl.peerID {
1731                 return
1732         }
1733         if t.cl.badPeerIPPort(peer.IP, peer.Port) {
1734                 return
1735         }
1736         addr := IpPort{peer.IP, uint16(peer.Port)}
1737         if t.addrActive(addr.String()) {
1738                 return
1739         }
1740         t.halfOpen[addr.String()] = peer
1741         go t.cl.outgoingConnection(t, addr, peer.Source)
1742 }
1743
1744 func (t *Torrent) AddClientPeer(cl *Client) {
1745         t.AddPeers(func() (ps []Peer) {
1746                 for _, la := range cl.ListenAddrs() {
1747                         ps = append(ps, Peer{
1748                                 IP:   missinggo.AddrIP(la),
1749                                 Port: missinggo.AddrPort(la),
1750                         })
1751                 }
1752                 return
1753         }())
1754 }
1755
1756 // All stats that include this Torrent. Useful when we want to increment
1757 // ConnStats but not for every connection.
1758 func (t *Torrent) allStats(f func(*ConnStats)) {
1759         f(&t.stats)
1760         f(&t.cl.stats)
1761 }
1762
1763 func (t *Torrent) hashingPiece(i pieceIndex) bool {
1764         return t.pieces[i].hashing
1765 }
1766
1767 func (t *Torrent) pieceQueuedForHash(i pieceIndex) bool {
1768         return t.piecesQueuedForHash.Get(bitmap.BitIndex(i))
1769 }
1770
1771 func (t *Torrent) dialTimeout() time.Duration {
1772         return reducedDialTimeout(t.cl.config.MinDialTimeout, t.cl.config.NominalDialTimeout, t.cl.config.HalfOpenConnsPerTorrent, t.peers.Len())
1773 }