]> Sergey Matveev's repositories - btrtrc.git/blob - torrent.go
2937b94ca3b96e194e344851a33a9a6e05f212d3
[btrtrc.git] / torrent.go
1 package torrent
2
3 import (
4         "container/heap"
5         "crypto/sha1"
6         "errors"
7         "fmt"
8         "io"
9         "math/rand"
10         "net/url"
11         "os"
12         "sync"
13         "text/tabwriter"
14         "time"
15         "unsafe"
16
17         "github.com/davecgh/go-spew/spew"
18
19         "github.com/anacrolix/dht/v2"
20         "github.com/anacrolix/log"
21         "github.com/anacrolix/missinggo"
22         "github.com/anacrolix/missinggo/bitmap"
23         "github.com/anacrolix/missinggo/perf"
24         "github.com/anacrolix/missinggo/prioritybitmap"
25         "github.com/anacrolix/missinggo/pubsub"
26         "github.com/anacrolix/missinggo/slices"
27
28         "github.com/anacrolix/torrent/bencode"
29         "github.com/anacrolix/torrent/metainfo"
30         pp "github.com/anacrolix/torrent/peer_protocol"
31         "github.com/anacrolix/torrent/storage"
32         "github.com/anacrolix/torrent/tracker"
33 )
34
35 func (t *Torrent) chunkIndexSpec(chunkIndex pp.Integer, piece pieceIndex) chunkSpec {
36         return chunkIndexSpec(chunkIndex, t.pieceLength(piece), t.chunkSize)
37 }
38
39 // Maintains state of torrent within a Client. Many methods should not be called before the info is
40 // available, see .Info and .GotInfo.
41 type Torrent struct {
42         // Torrent-level aggregate statistics. First in struct to ensure 64-bit
43         // alignment. See #262.
44         stats  ConnStats
45         cl     *Client
46         logger log.Logger
47
48         networkingEnabled bool
49
50         // Determines what chunks to request from peers. 1: Favour higher priority
51         // pieces with some fuzzing to reduce overlaps and wastage across
52         // connections. 2: The fastest connection downloads strictly in order of
53         // priority, while all others adher to their piece inclications. 3:
54         // Requests are strictly by piece priority, and not duplicated until
55         // duplicateRequestTimeout is reached.
56         requestStrategy int
57         // How long to avoid duplicating a pending request.
58         duplicateRequestTimeout time.Duration
59
60         closed   missinggo.Event
61         infoHash metainfo.Hash
62         pieces   []Piece
63         // Values are the piece indices that changed.
64         pieceStateChanges *pubsub.PubSub
65         // The size of chunks to request from peers over the wire. This is
66         // normally 16KiB by convention these days.
67         chunkSize pp.Integer
68         chunkPool *sync.Pool
69         // Total length of the torrent in bytes. Stored because it's not O(1) to
70         // get this from the info dict.
71         length *int64
72
73         // The storage to open when the info dict becomes available.
74         storageOpener *storage.Client
75         // Storage for torrent data.
76         storage *storage.Torrent
77         // Read-locked for using storage, and write-locked for Closing.
78         storageLock sync.RWMutex
79
80         // TODO: Only announce stuff is used?
81         metainfo metainfo.MetaInfo
82
83         // The info dict. nil if we don't have it (yet).
84         info  *metainfo.Info
85         files *[]*File
86
87         // Active peer connections, running message stream loops. TODO: Make this
88         // open (not-closed) connections only.
89         conns               map[*connection]struct{}
90         maxEstablishedConns int
91         // Set of addrs to which we're attempting to connect. Connections are
92         // half-open until all handshakes are completed.
93         halfOpen    map[string]Peer
94         fastestConn *connection
95
96         // Reserve of peers to connect to. A peer can be both here and in the
97         // active connections if were told about the peer after connecting with
98         // them. That encourages us to reconnect to peers that are well known in
99         // the swarm.
100         peers          prioritizedPeers
101         wantPeersEvent missinggo.Event
102         // An announcer for each tracker URL.
103         trackerAnnouncers map[string]*trackerScraper
104         // How many times we've initiated a DHT announce. TODO: Move into stats.
105         numDHTAnnounces int
106
107         // Name used if the info name isn't available. Should be cleared when the
108         // Info does become available.
109         nameMu      sync.RWMutex
110         displayName string
111
112         // The bencoded bytes of the info dict. This is actively manipulated if
113         // the info bytes aren't initially available, and we try to fetch them
114         // from peers.
115         metadataBytes []byte
116         // Each element corresponds to the 16KiB metadata pieces. If true, we have
117         // received that piece.
118         metadataCompletedChunks []bool
119         metadataChanged         sync.Cond
120
121         // Set when .Info is obtained.
122         gotMetainfo missinggo.Event
123
124         readers               map[*reader]struct{}
125         readerNowPieces       bitmap.Bitmap
126         readerReadaheadPieces bitmap.Bitmap
127
128         // A cache of pieces we need to get. Calculated from various piece and
129         // file priorities and completion states elsewhere.
130         pendingPieces prioritybitmap.PriorityBitmap
131         // A cache of completed piece indices.
132         completedPieces bitmap.Bitmap
133         // Pieces that need to be hashed.
134         piecesQueuedForHash bitmap.Bitmap
135         activePieceHashes   int
136
137         // A pool of piece priorities []int for assignment to new connections.
138         // These "inclinations" are used to give connections preference for
139         // different pieces.
140         connPieceInclinationPool sync.Pool
141
142         // Count of each request across active connections.
143         pendingRequests map[request]int
144         // The last time we requested a chunk. Deleting the request from any
145         // connection will clear this value.
146         lastRequested map[request]*time.Timer
147 }
148
149 func (t *Torrent) tickleReaders() {
150         t.cl.event.Broadcast()
151 }
152
153 // Returns a channel that is closed when the Torrent is closed.
154 func (t *Torrent) Closed() <-chan struct{} {
155         return t.closed.LockedChan(t.cl.locker())
156 }
157
158 // KnownSwarm returns the known subset of the peers in the Torrent's swarm, including active,
159 // pending, and half-open peers.
160 func (t *Torrent) KnownSwarm() (ks []Peer) {
161         // Add pending peers to the list
162         t.peers.Each(func(peer Peer) {
163                 ks = append(ks, peer)
164         })
165
166         // Add half-open peers to the list
167         for _, peer := range t.halfOpen {
168                 ks = append(ks, peer)
169         }
170
171         // Add active peers to the list
172         for conn := range t.conns {
173
174                 ks = append(ks, Peer{
175                         Id:     conn.PeerID,
176                         IP:     conn.remoteAddr.IP,
177                         Port:   int(conn.remoteAddr.Port),
178                         Source: conn.Discovery,
179                         // > If the connection is encrypted, that's certainly enough to set SupportsEncryption.
180                         // > But if we're not connected to them with an encrypted connection, I couldn't say
181                         // > what's appropriate. We can carry forward the SupportsEncryption value as we
182                         // > received it from trackers/DHT/PEX, or just use the encryption state for the
183                         // > connection. It's probably easiest to do the latter for now.
184                         // https://github.com/anacrolix/torrent/pull/188
185                         SupportsEncryption: conn.headerEncrypted,
186                 })
187         }
188
189         return
190 }
191
192 func (t *Torrent) setChunkSize(size pp.Integer) {
193         t.chunkSize = size
194         t.chunkPool = &sync.Pool{
195                 New: func() interface{} {
196                         b := make([]byte, size)
197                         return &b
198                 },
199         }
200 }
201
202 func (t *Torrent) pieceComplete(piece pieceIndex) bool {
203         return t.completedPieces.Get(bitmap.BitIndex(piece))
204 }
205
206 func (t *Torrent) pieceCompleteUncached(piece pieceIndex) storage.Completion {
207         return t.pieces[piece].Storage().Completion()
208 }
209
210 // There's a connection to that address already.
211 func (t *Torrent) addrActive(addr string) bool {
212         if _, ok := t.halfOpen[addr]; ok {
213                 return true
214         }
215         for c := range t.conns {
216                 ra := c.remoteAddr
217                 if ra.String() == addr {
218                         return true
219                 }
220         }
221         return false
222 }
223
224 func (t *Torrent) unclosedConnsAsSlice() (ret []*connection) {
225         ret = make([]*connection, 0, len(t.conns))
226         for c := range t.conns {
227                 if !c.closed.IsSet() {
228                         ret = append(ret, c)
229                 }
230         }
231         return
232 }
233
234 func (t *Torrent) addPeer(p Peer) {
235         cl := t.cl
236         peersAddedBySource.Add(string(p.Source), 1)
237         if t.closed.IsSet() {
238                 return
239         }
240         if cl.badPeerIPPort(p.IP, p.Port) {
241                 torrent.Add("peers not added because of bad addr", 1)
242                 return
243         }
244         if t.peers.Add(p) {
245                 torrent.Add("peers replaced", 1)
246         }
247         t.openNewConns()
248         for t.peers.Len() > cl.config.TorrentPeersHighWater {
249                 _, ok := t.peers.DeleteMin()
250                 if ok {
251                         torrent.Add("excess reserve peers discarded", 1)
252                 }
253         }
254 }
255
256 func (t *Torrent) invalidateMetadata() {
257         for i := range t.metadataCompletedChunks {
258                 t.metadataCompletedChunks[i] = false
259         }
260         t.nameMu.Lock()
261         t.info = nil
262         t.nameMu.Unlock()
263 }
264
265 func (t *Torrent) saveMetadataPiece(index int, data []byte) {
266         if t.haveInfo() {
267                 return
268         }
269         if index >= len(t.metadataCompletedChunks) {
270                 t.logger.Printf("%s: ignoring metadata piece %d", t, index)
271                 return
272         }
273         copy(t.metadataBytes[(1<<14)*index:], data)
274         t.metadataCompletedChunks[index] = true
275 }
276
277 func (t *Torrent) metadataPieceCount() int {
278         return (len(t.metadataBytes) + (1 << 14) - 1) / (1 << 14)
279 }
280
281 func (t *Torrent) haveMetadataPiece(piece int) bool {
282         if t.haveInfo() {
283                 return (1<<14)*piece < len(t.metadataBytes)
284         } else {
285                 return piece < len(t.metadataCompletedChunks) && t.metadataCompletedChunks[piece]
286         }
287 }
288
289 func (t *Torrent) metadataSize() int {
290         return len(t.metadataBytes)
291 }
292
293 func infoPieceHashes(info *metainfo.Info) (ret [][]byte) {
294         for i := 0; i < len(info.Pieces); i += sha1.Size {
295                 ret = append(ret, info.Pieces[i:i+sha1.Size])
296         }
297         return
298 }
299
300 func (t *Torrent) makePieces() {
301         hashes := infoPieceHashes(t.info)
302         t.pieces = make([]Piece, len(hashes), len(hashes))
303         for i, hash := range hashes {
304                 piece := &t.pieces[i]
305                 piece.t = t
306                 piece.index = pieceIndex(i)
307                 piece.noPendingWrites.L = &piece.pendingWritesMutex
308                 piece.hash = (*metainfo.Hash)(unsafe.Pointer(&hash[0]))
309                 files := *t.files
310                 beginFile := pieceFirstFileIndex(piece.torrentBeginOffset(), files)
311                 endFile := pieceEndFileIndex(piece.torrentEndOffset(), files)
312                 piece.files = files[beginFile:endFile]
313         }
314 }
315
316 // Returns the index of the first file containing the piece. files must be
317 // ordered by offset.
318 func pieceFirstFileIndex(pieceOffset int64, files []*File) int {
319         for i, f := range files {
320                 if f.offset+f.length > pieceOffset {
321                         return i
322                 }
323         }
324         return 0
325 }
326
327 // Returns the index after the last file containing the piece. files must be
328 // ordered by offset.
329 func pieceEndFileIndex(pieceEndOffset int64, files []*File) int {
330         for i, f := range files {
331                 if f.offset+f.length >= pieceEndOffset {
332                         return i + 1
333                 }
334         }
335         return 0
336 }
337
338 func (t *Torrent) cacheLength() {
339         var l int64
340         for _, f := range t.info.UpvertedFiles() {
341                 l += f.Length
342         }
343         t.length = &l
344 }
345
346 func (t *Torrent) setInfo(info *metainfo.Info) error {
347         if err := validateInfo(info); err != nil {
348                 return fmt.Errorf("bad info: %s", err)
349         }
350         if t.storageOpener != nil {
351                 var err error
352                 t.storage, err = t.storageOpener.OpenTorrent(info, t.infoHash)
353                 if err != nil {
354                         return fmt.Errorf("error opening torrent storage: %s", err)
355                 }
356         }
357         t.nameMu.Lock()
358         t.info = info
359         t.nameMu.Unlock()
360         t.displayName = "" // Save a few bytes lol.
361         t.initFiles()
362         t.cacheLength()
363         t.makePieces()
364         return nil
365 }
366
367 func (t *Torrent) onSetInfo() {
368         for conn := range t.conns {
369                 if err := conn.setNumPieces(t.numPieces()); err != nil {
370                         t.logger.Printf("closing connection: %s", err)
371                         conn.Close()
372                 }
373         }
374         for i := range t.pieces {
375                 t.updatePieceCompletion(pieceIndex(i))
376                 p := &t.pieces[i]
377                 if !p.storageCompletionOk {
378                         // t.logger.Printf("piece %s completion unknown, queueing check", p)
379                         t.queuePieceCheck(pieceIndex(i))
380                 }
381         }
382         t.cl.event.Broadcast()
383         t.gotMetainfo.Set()
384         t.updateWantPeersEvent()
385         t.pendingRequests = make(map[request]int)
386         t.lastRequested = make(map[request]*time.Timer)
387         t.tryCreateMorePieceHashers()
388 }
389
390 // Called when metadata for a torrent becomes available.
391 func (t *Torrent) setInfoBytes(b []byte) error {
392         if metainfo.HashBytes(b) != t.infoHash {
393                 return errors.New("info bytes have wrong hash")
394         }
395         var info metainfo.Info
396         if err := bencode.Unmarshal(b, &info); err != nil {
397                 return fmt.Errorf("error unmarshalling info bytes: %s", err)
398         }
399         if err := t.setInfo(&info); err != nil {
400                 return err
401         }
402         t.metadataBytes = b
403         t.metadataCompletedChunks = nil
404         t.onSetInfo()
405         return nil
406 }
407
408 func (t *Torrent) haveAllMetadataPieces() bool {
409         if t.haveInfo() {
410                 return true
411         }
412         if t.metadataCompletedChunks == nil {
413                 return false
414         }
415         for _, have := range t.metadataCompletedChunks {
416                 if !have {
417                         return false
418                 }
419         }
420         return true
421 }
422
423 // TODO: Propagate errors to disconnect peer.
424 func (t *Torrent) setMetadataSize(bytes int) (err error) {
425         if t.haveInfo() {
426                 // We already know the correct metadata size.
427                 return
428         }
429         if bytes <= 0 || bytes > 10000000 { // 10MB, pulled from my ass.
430                 return errors.New("bad size")
431         }
432         if t.metadataBytes != nil && len(t.metadataBytes) == int(bytes) {
433                 return
434         }
435         t.metadataBytes = make([]byte, bytes)
436         t.metadataCompletedChunks = make([]bool, (bytes+(1<<14)-1)/(1<<14))
437         t.metadataChanged.Broadcast()
438         for c := range t.conns {
439                 c.requestPendingMetadata()
440         }
441         return
442 }
443
444 // The current working name for the torrent. Either the name in the info dict,
445 // or a display name given such as by the dn value in a magnet link, or "".
446 func (t *Torrent) name() string {
447         t.nameMu.RLock()
448         defer t.nameMu.RUnlock()
449         if t.haveInfo() {
450                 return t.info.Name
451         }
452         return t.displayName
453 }
454
455 func (t *Torrent) pieceState(index pieceIndex) (ret PieceState) {
456         p := &t.pieces[index]
457         ret.Priority = t.piecePriority(index)
458         ret.Completion = p.completion()
459         if p.queuedForHash() || p.hashing {
460                 ret.Checking = true
461         }
462         if !ret.Complete && t.piecePartiallyDownloaded(index) {
463                 ret.Partial = true
464         }
465         return
466 }
467
468 func (t *Torrent) metadataPieceSize(piece int) int {
469         return metadataPieceSize(len(t.metadataBytes), piece)
470 }
471
472 func (t *Torrent) newMetadataExtensionMessage(c *connection, msgType int, piece int, data []byte) pp.Message {
473         d := map[string]int{
474                 "msg_type": msgType,
475                 "piece":    piece,
476         }
477         if data != nil {
478                 d["total_size"] = len(t.metadataBytes)
479         }
480         p := bencode.MustMarshal(d)
481         return pp.Message{
482                 Type:            pp.Extended,
483                 ExtendedID:      c.PeerExtensionIDs[pp.ExtensionNameMetadata],
484                 ExtendedPayload: append(p, data...),
485         }
486 }
487
488 func (t *Torrent) pieceStateRuns() (ret []PieceStateRun) {
489         rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
490                 ret = append(ret, PieceStateRun{
491                         PieceState: el.(PieceState),
492                         Length:     int(count),
493                 })
494         })
495         for index := range t.pieces {
496                 rle.Append(t.pieceState(pieceIndex(index)), 1)
497         }
498         rle.Flush()
499         return
500 }
501
502 // Produces a small string representing a PieceStateRun.
503 func pieceStateRunStatusChars(psr PieceStateRun) (ret string) {
504         ret = fmt.Sprintf("%d", psr.Length)
505         ret += func() string {
506                 switch psr.Priority {
507                 case PiecePriorityNext:
508                         return "N"
509                 case PiecePriorityNormal:
510                         return "."
511                 case PiecePriorityReadahead:
512                         return "R"
513                 case PiecePriorityNow:
514                         return "!"
515                 case PiecePriorityHigh:
516                         return "H"
517                 default:
518                         return ""
519                 }
520         }()
521         if psr.Checking {
522                 ret += "H"
523         }
524         if psr.Partial {
525                 ret += "P"
526         }
527         if psr.Complete {
528                 ret += "C"
529         }
530         if !psr.Ok {
531                 ret += "?"
532         }
533         return
534 }
535
536 func (t *Torrent) writeStatus(w io.Writer) {
537         fmt.Fprintf(w, "Infohash: %s\n", t.infoHash.HexString())
538         fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
539         if !t.haveInfo() {
540                 fmt.Fprintf(w, "Metadata have: ")
541                 for _, h := range t.metadataCompletedChunks {
542                         fmt.Fprintf(w, "%c", func() rune {
543                                 if h {
544                                         return 'H'
545                                 } else {
546                                         return '.'
547                                 }
548                         }())
549                 }
550                 fmt.Fprintln(w)
551         }
552         fmt.Fprintf(w, "Piece length: %s\n", func() string {
553                 if t.haveInfo() {
554                         return fmt.Sprint(t.usualPieceSize())
555                 } else {
556                         return "?"
557                 }
558         }())
559         if t.info != nil {
560                 fmt.Fprintf(w, "Num Pieces: %d (%d completed)\n", t.numPieces(), t.numPiecesCompleted())
561                 fmt.Fprint(w, "Piece States:")
562                 for _, psr := range t.pieceStateRuns() {
563                         w.Write([]byte(" "))
564                         w.Write([]byte(pieceStateRunStatusChars(psr)))
565                 }
566                 fmt.Fprintln(w)
567         }
568         fmt.Fprintf(w, "Reader Pieces:")
569         t.forReaderOffsetPieces(func(begin, end pieceIndex) (again bool) {
570                 fmt.Fprintf(w, " %d:%d", begin, end)
571                 return true
572         })
573         fmt.Fprintln(w)
574
575         fmt.Fprintf(w, "Enabled trackers:\n")
576         func() {
577                 tw := tabwriter.NewWriter(w, 0, 0, 2, ' ', 0)
578                 fmt.Fprintf(tw, "    URL\tNext announce\tLast announce\n")
579                 for _, ta := range slices.Sort(slices.FromMapElems(t.trackerAnnouncers), func(l, r *trackerScraper) bool {
580                         lu := l.u
581                         ru := r.u
582                         var luns, runs url.URL = lu, ru
583                         luns.Scheme = ""
584                         runs.Scheme = ""
585                         var ml missinggo.MultiLess
586                         ml.StrictNext(luns.String() == runs.String(), luns.String() < runs.String())
587                         ml.StrictNext(lu.String() == ru.String(), lu.String() < ru.String())
588                         return ml.Less()
589                 }).([]*trackerScraper) {
590                         fmt.Fprintf(tw, "    %s\n", ta.statusLine())
591                 }
592                 tw.Flush()
593         }()
594
595         fmt.Fprintf(w, "DHT Announces: %d\n", t.numDHTAnnounces)
596
597         spew.NewDefaultConfig()
598         spew.Fdump(w, t.statsLocked())
599
600         conns := t.connsAsSlice()
601         slices.Sort(conns, worseConn)
602         for i, c := range conns {
603                 fmt.Fprintf(w, "%2d. ", i+1)
604                 c.WriteStatus(w, t)
605         }
606 }
607
608 func (t *Torrent) haveInfo() bool {
609         return t.info != nil
610 }
611
612 // Returns a run-time generated MetaInfo that includes the info bytes and
613 // announce-list as currently known to the client.
614 func (t *Torrent) newMetaInfo() metainfo.MetaInfo {
615         return metainfo.MetaInfo{
616                 CreationDate: time.Now().Unix(),
617                 Comment:      "dynamic metainfo from client",
618                 CreatedBy:    "go.torrent",
619                 AnnounceList: t.metainfo.UpvertedAnnounceList(),
620                 InfoBytes: func() []byte {
621                         if t.haveInfo() {
622                                 return t.metadataBytes
623                         } else {
624                                 return nil
625                         }
626                 }(),
627         }
628 }
629
630 func (t *Torrent) BytesMissing() int64 {
631         t.cl.rLock()
632         defer t.cl.rUnlock()
633         return t.bytesMissingLocked()
634 }
635
636 func (t *Torrent) bytesMissingLocked() int64 {
637         return t.bytesLeft()
638 }
639
640 func (t *Torrent) bytesLeft() (left int64) {
641         bitmap.Flip(t.completedPieces, 0, bitmap.BitIndex(t.numPieces())).IterTyped(func(piece int) bool {
642                 p := &t.pieces[piece]
643                 left += int64(p.length() - p.numDirtyBytes())
644                 return true
645         })
646         return
647 }
648
649 // Bytes left to give in tracker announces.
650 func (t *Torrent) bytesLeftAnnounce() int64 {
651         if t.haveInfo() {
652                 return t.bytesLeft()
653         } else {
654                 return -1
655         }
656 }
657
658 func (t *Torrent) piecePartiallyDownloaded(piece pieceIndex) bool {
659         if t.pieceComplete(piece) {
660                 return false
661         }
662         if t.pieceAllDirty(piece) {
663                 return false
664         }
665         return t.pieces[piece].hasDirtyChunks()
666 }
667
668 func (t *Torrent) usualPieceSize() int {
669         return int(t.info.PieceLength)
670 }
671
672 func (t *Torrent) numPieces() pieceIndex {
673         return pieceIndex(t.info.NumPieces())
674 }
675
676 func (t *Torrent) numPiecesCompleted() (num int) {
677         return t.completedPieces.Len()
678 }
679
680 func (t *Torrent) close() (err error) {
681         t.closed.Set()
682         t.tickleReaders()
683         if t.storage != nil {
684                 t.storageLock.Lock()
685                 t.storage.Close()
686                 t.storageLock.Unlock()
687         }
688         for conn := range t.conns {
689                 conn.Close()
690         }
691         t.cl.event.Broadcast()
692         t.pieceStateChanges.Close()
693         t.updateWantPeersEvent()
694         return
695 }
696
697 func (t *Torrent) requestOffset(r request) int64 {
698         return torrentRequestOffset(*t.length, int64(t.usualPieceSize()), r)
699 }
700
701 // Return the request that would include the given offset into the torrent
702 // data. Returns !ok if there is no such request.
703 func (t *Torrent) offsetRequest(off int64) (req request, ok bool) {
704         return torrentOffsetRequest(*t.length, t.info.PieceLength, int64(t.chunkSize), off)
705 }
706
707 func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
708         defer perf.ScopeTimerErr(&err)()
709         n, err := t.pieces[piece].Storage().WriteAt(data, begin)
710         if err == nil && n != len(data) {
711                 err = io.ErrShortWrite
712         }
713         return
714 }
715
716 func (t *Torrent) bitfield() (bf []bool) {
717         bf = make([]bool, t.numPieces())
718         t.completedPieces.IterTyped(func(piece int) (again bool) {
719                 bf[piece] = true
720                 return true
721         })
722         return
723 }
724
725 func (t *Torrent) pieceNumChunks(piece pieceIndex) pp.Integer {
726         return (t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize
727 }
728
729 func (t *Torrent) pendAllChunkSpecs(pieceIndex pieceIndex) {
730         t.pieces[pieceIndex].dirtyChunks.Clear()
731 }
732
733 func (t *Torrent) pieceLength(piece pieceIndex) pp.Integer {
734         if t.info.PieceLength == 0 {
735                 // There will be no variance amongst pieces. Only pain.
736                 return 0
737         }
738         if piece == t.numPieces()-1 {
739                 ret := pp.Integer(*t.length % t.info.PieceLength)
740                 if ret != 0 {
741                         return ret
742                 }
743         }
744         return pp.Integer(t.info.PieceLength)
745 }
746
747 func (t *Torrent) hashPiece(piece pieceIndex) (ret metainfo.Hash) {
748         hash := pieceHash.New()
749         p := t.piece(piece)
750         p.waitNoPendingWrites()
751         ip := t.info.Piece(int(piece))
752         pl := ip.Length()
753         n, err := io.Copy(hash, io.NewSectionReader(t.pieces[piece].Storage(), 0, pl))
754         if n == pl {
755                 missinggo.CopyExact(&ret, hash.Sum(nil))
756                 return
757         }
758         if err != io.ErrUnexpectedEOF && !os.IsNotExist(err) {
759                 t.logger.Printf("unexpected error hashing piece %d through %T: %s", piece, t.storage.TorrentImpl, err)
760         }
761         return
762 }
763
764 func (t *Torrent) haveAnyPieces() bool {
765         return t.completedPieces.Len() != 0
766 }
767
768 func (t *Torrent) haveAllPieces() bool {
769         if !t.haveInfo() {
770                 return false
771         }
772         return t.completedPieces.Len() == bitmap.BitIndex(t.numPieces())
773 }
774
775 func (t *Torrent) havePiece(index pieceIndex) bool {
776         return t.haveInfo() && t.pieceComplete(index)
777 }
778
779 func (t *Torrent) haveChunk(r request) (ret bool) {
780         // defer func() {
781         //      log.Println("have chunk", r, ret)
782         // }()
783         if !t.haveInfo() {
784                 return false
785         }
786         if t.pieceComplete(pieceIndex(r.Index)) {
787                 return true
788         }
789         p := &t.pieces[r.Index]
790         return !p.pendingChunk(r.chunkSpec, t.chunkSize)
791 }
792
793 func chunkIndex(cs chunkSpec, chunkSize pp.Integer) int {
794         return int(cs.Begin / chunkSize)
795 }
796
797 func (t *Torrent) wantPieceIndex(index pieceIndex) bool {
798         if !t.haveInfo() {
799                 return false
800         }
801         if index < 0 || index >= t.numPieces() {
802                 return false
803         }
804         p := &t.pieces[index]
805         if p.queuedForHash() {
806                 return false
807         }
808         if p.hashing {
809                 return false
810         }
811         if t.pieceComplete(index) {
812                 return false
813         }
814         if t.pendingPieces.Contains(bitmap.BitIndex(index)) {
815                 return true
816         }
817         // t.logger.Printf("piece %d not pending", index)
818         return !t.forReaderOffsetPieces(func(begin, end pieceIndex) bool {
819                 return index < begin || index >= end
820         })
821 }
822
823 // The worst connection is one that hasn't been sent, or sent anything useful
824 // for the longest. A bad connection is one that usually sends us unwanted
825 // pieces, or has been in worser half of the established connections for more
826 // than a minute.
827 func (t *Torrent) worstBadConn() *connection {
828         wcs := worseConnSlice{t.unclosedConnsAsSlice()}
829         heap.Init(&wcs)
830         for wcs.Len() != 0 {
831                 c := heap.Pop(&wcs).(*connection)
832                 if c.stats.ChunksReadWasted.Int64() >= 6 && c.stats.ChunksReadWasted.Int64() > c.stats.ChunksReadUseful.Int64() {
833                         return c
834                 }
835                 // If the connection is in the worst half of the established
836                 // connection quota and is older than a minute.
837                 if wcs.Len() >= (t.maxEstablishedConns+1)/2 {
838                         // Give connections 1 minute to prove themselves.
839                         if time.Since(c.completedHandshake) > time.Minute {
840                                 return c
841                         }
842                 }
843         }
844         return nil
845 }
846
847 type PieceStateChange struct {
848         Index int
849         PieceState
850 }
851
852 func (t *Torrent) publishPieceChange(piece pieceIndex) {
853         t.cl._mu.Defer(func() {
854                 cur := t.pieceState(piece)
855                 p := &t.pieces[piece]
856                 if cur != p.publicPieceState {
857                         p.publicPieceState = cur
858                         t.pieceStateChanges.Publish(PieceStateChange{
859                                 int(piece),
860                                 cur,
861                         })
862                 }
863         })
864 }
865
866 func (t *Torrent) pieceNumPendingChunks(piece pieceIndex) pp.Integer {
867         if t.pieceComplete(piece) {
868                 return 0
869         }
870         return t.pieceNumChunks(piece) - t.pieces[piece].numDirtyChunks()
871 }
872
873 func (t *Torrent) pieceAllDirty(piece pieceIndex) bool {
874         return t.pieces[piece].dirtyChunks.Len() == int(t.pieceNumChunks(piece))
875 }
876
877 func (t *Torrent) readersChanged() {
878         t.updateReaderPieces()
879         t.updateAllPiecePriorities()
880 }
881
882 func (t *Torrent) updateReaderPieces() {
883         t.readerNowPieces, t.readerReadaheadPieces = t.readerPiecePriorities()
884 }
885
886 func (t *Torrent) readerPosChanged(from, to pieceRange) {
887         if from == to {
888                 return
889         }
890         t.updateReaderPieces()
891         // Order the ranges, high and low.
892         l, h := from, to
893         if l.begin > h.begin {
894                 l, h = h, l
895         }
896         if l.end < h.begin {
897                 // Two distinct ranges.
898                 t.updatePiecePriorities(l.begin, l.end)
899                 t.updatePiecePriorities(h.begin, h.end)
900         } else {
901                 // Ranges overlap.
902                 end := l.end
903                 if h.end > end {
904                         end = h.end
905                 }
906                 t.updatePiecePriorities(l.begin, end)
907         }
908 }
909
910 func (t *Torrent) maybeNewConns() {
911         // Tickle the accept routine.
912         t.cl.event.Broadcast()
913         t.openNewConns()
914 }
915
916 func (t *Torrent) piecePriorityChanged(piece pieceIndex) {
917         // t.logger.Printf("piece %d priority changed", piece)
918         for c := range t.conns {
919                 if c.updatePiecePriority(piece) {
920                         // log.Print("conn piece priority changed")
921                         c.updateRequests()
922                 }
923         }
924         t.maybeNewConns()
925         t.publishPieceChange(piece)
926 }
927
928 func (t *Torrent) updatePiecePriority(piece pieceIndex) {
929         p := &t.pieces[piece]
930         newPrio := p.uncachedPriority()
931         // t.logger.Printf("torrent %p: piece %d: uncached priority: %v", t, piece, newPrio)
932         if newPrio == PiecePriorityNone {
933                 if !t.pendingPieces.Remove(bitmap.BitIndex(piece)) {
934                         return
935                 }
936         } else {
937                 if !t.pendingPieces.Set(bitmap.BitIndex(piece), newPrio.BitmapPriority()) {
938                         return
939                 }
940         }
941         t.piecePriorityChanged(piece)
942 }
943
944 func (t *Torrent) updateAllPiecePriorities() {
945         t.updatePiecePriorities(0, t.numPieces())
946 }
947
948 // Update all piece priorities in one hit. This function should have the same
949 // output as updatePiecePriority, but across all pieces.
950 func (t *Torrent) updatePiecePriorities(begin, end pieceIndex) {
951         for i := begin; i < end; i++ {
952                 t.updatePiecePriority(i)
953         }
954 }
955
956 // Returns the range of pieces [begin, end) that contains the extent of bytes.
957 func (t *Torrent) byteRegionPieces(off, size int64) (begin, end pieceIndex) {
958         if off >= *t.length {
959                 return
960         }
961         if off < 0 {
962                 size += off
963                 off = 0
964         }
965         if size <= 0 {
966                 return
967         }
968         begin = pieceIndex(off / t.info.PieceLength)
969         end = pieceIndex((off + size + t.info.PieceLength - 1) / t.info.PieceLength)
970         if end > pieceIndex(t.info.NumPieces()) {
971                 end = pieceIndex(t.info.NumPieces())
972         }
973         return
974 }
975
976 // Returns true if all iterations complete without breaking. Returns the read
977 // regions for all readers. The reader regions should not be merged as some
978 // callers depend on this method to enumerate readers.
979 func (t *Torrent) forReaderOffsetPieces(f func(begin, end pieceIndex) (more bool)) (all bool) {
980         for r := range t.readers {
981                 p := r.pieces
982                 if p.begin >= p.end {
983                         continue
984                 }
985                 if !f(p.begin, p.end) {
986                         return false
987                 }
988         }
989         return true
990 }
991
992 func (t *Torrent) piecePriority(piece pieceIndex) piecePriority {
993         prio, ok := t.pendingPieces.GetPriority(bitmap.BitIndex(piece))
994         if !ok {
995                 return PiecePriorityNone
996         }
997         if prio > 0 {
998                 panic(prio)
999         }
1000         ret := piecePriority(-prio)
1001         if ret == PiecePriorityNone {
1002                 panic(piece)
1003         }
1004         return ret
1005 }
1006
1007 func (t *Torrent) pendRequest(req request) {
1008         ci := chunkIndex(req.chunkSpec, t.chunkSize)
1009         t.pieces[req.Index].pendChunkIndex(ci)
1010 }
1011
1012 func (t *Torrent) pieceCompletionChanged(piece pieceIndex) {
1013         t.tickleReaders()
1014         t.cl.event.Broadcast()
1015         if t.pieceComplete(piece) {
1016                 t.onPieceCompleted(piece)
1017         } else {
1018                 t.onIncompletePiece(piece)
1019         }
1020         t.updatePiecePriority(piece)
1021 }
1022
1023 func (t *Torrent) numReceivedConns() (ret int) {
1024         for c := range t.conns {
1025                 if c.Discovery == peerSourceIncoming {
1026                         ret++
1027                 }
1028         }
1029         return
1030 }
1031
1032 func (t *Torrent) maxHalfOpen() int {
1033         // Note that if we somehow exceed the maximum established conns, we want
1034         // the negative value to have an effect.
1035         establishedHeadroom := int64(t.maxEstablishedConns - len(t.conns))
1036         extraIncoming := int64(t.numReceivedConns() - t.maxEstablishedConns/2)
1037         // We want to allow some experimentation with new peers, and to try to
1038         // upset an oversupply of received connections.
1039         return int(min(max(5, extraIncoming)+establishedHeadroom, int64(t.cl.config.HalfOpenConnsPerTorrent)))
1040 }
1041
1042 func (t *Torrent) openNewConns() {
1043         defer t.updateWantPeersEvent()
1044         for t.peers.Len() != 0 {
1045                 if !t.wantConns() {
1046                         return
1047                 }
1048                 if len(t.halfOpen) >= t.maxHalfOpen() {
1049                         return
1050                 }
1051                 p := t.peers.PopMax()
1052                 t.initiateConn(p)
1053         }
1054 }
1055
1056 func (t *Torrent) getConnPieceInclination() []int {
1057         _ret := t.connPieceInclinationPool.Get()
1058         if _ret == nil {
1059                 pieceInclinationsNew.Add(1)
1060                 return rand.Perm(int(t.numPieces()))
1061         }
1062         pieceInclinationsReused.Add(1)
1063         return *_ret.(*[]int)
1064 }
1065
1066 func (t *Torrent) putPieceInclination(pi []int) {
1067         t.connPieceInclinationPool.Put(&pi)
1068         pieceInclinationsPut.Add(1)
1069 }
1070
1071 func (t *Torrent) updatePieceCompletion(piece pieceIndex) bool {
1072         p := t.piece(piece)
1073         uncached := t.pieceCompleteUncached(piece)
1074         cached := p.completion()
1075         changed := cached != uncached
1076         p.storageCompletionOk = uncached.Ok
1077         t.completedPieces.Set(bitmap.BitIndex(piece), uncached.Complete)
1078         if changed {
1079                 log.Fstr("piece %d completion changed: %+v -> %+v", piece, cached, uncached).WithValues(debugLogValue).Log(t.logger)
1080                 t.pieceCompletionChanged(piece)
1081         }
1082         return changed
1083 }
1084
1085 // Non-blocking read. Client lock is not required.
1086 func (t *Torrent) readAt(b []byte, off int64) (n int, err error) {
1087         p := &t.pieces[off/t.info.PieceLength]
1088         p.waitNoPendingWrites()
1089         return p.Storage().ReadAt(b, off-p.Info().Offset())
1090 }
1091
1092 func (t *Torrent) updateAllPieceCompletions() {
1093         for i := pieceIndex(0); i < t.numPieces(); i++ {
1094                 t.updatePieceCompletion(i)
1095         }
1096 }
1097
1098 // Returns an error if the metadata was completed, but couldn't be set for
1099 // some reason. Blame it on the last peer to contribute.
1100 func (t *Torrent) maybeCompleteMetadata() error {
1101         if t.haveInfo() {
1102                 // Nothing to do.
1103                 return nil
1104         }
1105         if !t.haveAllMetadataPieces() {
1106                 // Don't have enough metadata pieces.
1107                 return nil
1108         }
1109         err := t.setInfoBytes(t.metadataBytes)
1110         if err != nil {
1111                 t.invalidateMetadata()
1112                 return fmt.Errorf("error setting info bytes: %s", err)
1113         }
1114         if t.cl.config.Debug {
1115                 t.logger.Printf("%s: got metadata from peers", t)
1116         }
1117         return nil
1118 }
1119
1120 func (t *Torrent) readerPiecePriorities() (now, readahead bitmap.Bitmap) {
1121         t.forReaderOffsetPieces(func(begin, end pieceIndex) bool {
1122                 if end > begin {
1123                         now.Add(bitmap.BitIndex(begin))
1124                         readahead.AddRange(bitmap.BitIndex(begin)+1, bitmap.BitIndex(end))
1125                 }
1126                 return true
1127         })
1128         return
1129 }
1130
1131 func (t *Torrent) needData() bool {
1132         if t.closed.IsSet() {
1133                 return false
1134         }
1135         if !t.haveInfo() {
1136                 return true
1137         }
1138         return t.pendingPieces.Len() != 0
1139 }
1140
1141 func appendMissingStrings(old, new []string) (ret []string) {
1142         ret = old
1143 new:
1144         for _, n := range new {
1145                 for _, o := range old {
1146                         if o == n {
1147                                 continue new
1148                         }
1149                 }
1150                 ret = append(ret, n)
1151         }
1152         return
1153 }
1154
1155 func appendMissingTrackerTiers(existing [][]string, minNumTiers int) (ret [][]string) {
1156         ret = existing
1157         for minNumTiers > len(ret) {
1158                 ret = append(ret, nil)
1159         }
1160         return
1161 }
1162
1163 func (t *Torrent) addTrackers(announceList [][]string) {
1164         fullAnnounceList := &t.metainfo.AnnounceList
1165         t.metainfo.AnnounceList = appendMissingTrackerTiers(*fullAnnounceList, len(announceList))
1166         for tierIndex, trackerURLs := range announceList {
1167                 (*fullAnnounceList)[tierIndex] = appendMissingStrings((*fullAnnounceList)[tierIndex], trackerURLs)
1168         }
1169         t.startMissingTrackerScrapers()
1170         t.updateWantPeersEvent()
1171 }
1172
1173 // Don't call this before the info is available.
1174 func (t *Torrent) bytesCompleted() int64 {
1175         if !t.haveInfo() {
1176                 return 0
1177         }
1178         return t.info.TotalLength() - t.bytesLeft()
1179 }
1180
1181 func (t *Torrent) SetInfoBytes(b []byte) (err error) {
1182         t.cl.lock()
1183         defer t.cl.unlock()
1184         return t.setInfoBytes(b)
1185 }
1186
1187 // Returns true if connection is removed from torrent.Conns.
1188 func (t *Torrent) deleteConnection(c *connection) (ret bool) {
1189         if !c.closed.IsSet() {
1190                 panic("connection is not closed")
1191                 // There are behaviours prevented by the closed state that will fail
1192                 // if the connection has been deleted.
1193         }
1194         _, ret = t.conns[c]
1195         delete(t.conns, c)
1196         torrent.Add("deleted connections", 1)
1197         c.deleteAllRequests()
1198         if len(t.conns) == 0 {
1199                 t.assertNoPendingRequests()
1200         }
1201         return
1202 }
1203
1204 func (t *Torrent) assertNoPendingRequests() {
1205         if len(t.pendingRequests) != 0 {
1206                 panic(t.pendingRequests)
1207         }
1208         if len(t.lastRequested) != 0 {
1209                 panic(t.lastRequested)
1210         }
1211 }
1212
1213 func (t *Torrent) dropConnection(c *connection) {
1214         t.cl.event.Broadcast()
1215         c.Close()
1216         if t.deleteConnection(c) {
1217                 t.openNewConns()
1218         }
1219 }
1220
1221 func (t *Torrent) wantPeers() bool {
1222         if t.closed.IsSet() {
1223                 return false
1224         }
1225         if t.peers.Len() > t.cl.config.TorrentPeersLowWater {
1226                 return false
1227         }
1228         return t.needData() || t.seeding()
1229 }
1230
1231 func (t *Torrent) updateWantPeersEvent() {
1232         if t.wantPeers() {
1233                 t.wantPeersEvent.Set()
1234         } else {
1235                 t.wantPeersEvent.Clear()
1236         }
1237 }
1238
1239 // Returns whether the client should make effort to seed the torrent.
1240 func (t *Torrent) seeding() bool {
1241         cl := t.cl
1242         if t.closed.IsSet() {
1243                 return false
1244         }
1245         if cl.config.NoUpload {
1246                 return false
1247         }
1248         if !cl.config.Seed {
1249                 return false
1250         }
1251         if cl.config.DisableAggressiveUpload && t.needData() {
1252                 return false
1253         }
1254         return true
1255 }
1256
1257 func (t *Torrent) startScrapingTracker(_url string) {
1258         if _url == "" {
1259                 return
1260         }
1261         u, err := url.Parse(_url)
1262         if err != nil {
1263                 // URLs with a leading '*' appear to be a uTorrent convention to
1264                 // disable trackers.
1265                 if _url[0] != '*' {
1266                         log.Str("error parsing tracker url").AddValues("url", _url).Log(t.logger)
1267                 }
1268                 return
1269         }
1270         if u.Scheme == "udp" {
1271                 u.Scheme = "udp4"
1272                 t.startScrapingTracker(u.String())
1273                 u.Scheme = "udp6"
1274                 t.startScrapingTracker(u.String())
1275                 return
1276         }
1277         if u.Scheme == "udp4" && (t.cl.config.DisableIPv4Peers || t.cl.config.DisableIPv4) {
1278                 return
1279         }
1280         if u.Scheme == "udp6" && t.cl.config.DisableIPv6 {
1281                 return
1282         }
1283         if _, ok := t.trackerAnnouncers[_url]; ok {
1284                 return
1285         }
1286         newAnnouncer := &trackerScraper{
1287                 u: *u,
1288                 t: t,
1289         }
1290         if t.trackerAnnouncers == nil {
1291                 t.trackerAnnouncers = make(map[string]*trackerScraper)
1292         }
1293         t.trackerAnnouncers[_url] = newAnnouncer
1294         go newAnnouncer.Run()
1295 }
1296
1297 // Adds and starts tracker scrapers for tracker URLs that aren't already
1298 // running.
1299 func (t *Torrent) startMissingTrackerScrapers() {
1300         if t.cl.config.DisableTrackers {
1301                 return
1302         }
1303         t.startScrapingTracker(t.metainfo.Announce)
1304         for _, tier := range t.metainfo.AnnounceList {
1305                 for _, url := range tier {
1306                         t.startScrapingTracker(url)
1307                 }
1308         }
1309 }
1310
1311 // Returns an AnnounceRequest with fields filled out to defaults and current
1312 // values.
1313 func (t *Torrent) announceRequest(event tracker.AnnounceEvent) tracker.AnnounceRequest {
1314         // Note that IPAddress is not set. It's set for UDP inside the tracker
1315         // code, since it's dependent on the network in use.
1316         return tracker.AnnounceRequest{
1317                 Event:    event,
1318                 NumWant:  -1,
1319                 Port:     uint16(t.cl.incomingPeerPort()),
1320                 PeerId:   t.cl.peerID,
1321                 InfoHash: t.infoHash,
1322                 Key:      t.cl.announceKey(),
1323
1324                 // The following are vaguely described in BEP 3.
1325
1326                 Left:     t.bytesLeftAnnounce(),
1327                 Uploaded: t.stats.BytesWrittenData.Int64(),
1328                 // There's no mention of wasted or unwanted download in the BEP.
1329                 Downloaded: t.stats.BytesReadUsefulData.Int64(),
1330         }
1331 }
1332
1333 // Adds peers revealed in an announce until the announce ends, or we have
1334 // enough peers.
1335 func (t *Torrent) consumeDhtAnnouncePeers(pvs <-chan dht.PeersValues) {
1336         cl := t.cl
1337         for v := range pvs {
1338                 cl.lock()
1339                 for _, cp := range v.Peers {
1340                         if cp.Port == 0 {
1341                                 // Can't do anything with this.
1342                                 continue
1343                         }
1344                         t.addPeer(Peer{
1345                                 IP:     cp.IP[:],
1346                                 Port:   cp.Port,
1347                                 Source: peerSourceDhtGetPeers,
1348                         })
1349                 }
1350                 cl.unlock()
1351         }
1352 }
1353
1354 func (t *Torrent) announceToDht(impliedPort bool, s *dht.Server) error {
1355         ps, err := s.Announce(t.infoHash, t.cl.incomingPeerPort(), impliedPort)
1356         if err != nil {
1357                 return err
1358         }
1359         go t.consumeDhtAnnouncePeers(ps.Peers)
1360         select {
1361         case <-t.closed.LockedChan(t.cl.locker()):
1362         case <-time.After(5 * time.Minute):
1363         }
1364         ps.Close()
1365         return nil
1366 }
1367
1368 func (t *Torrent) dhtAnnouncer(s *dht.Server) {
1369         cl := t.cl
1370         for {
1371                 select {
1372                 case <-t.closed.LockedChan(cl.locker()):
1373                         return
1374                 case <-t.wantPeersEvent.LockedChan(cl.locker()):
1375                 }
1376                 cl.lock()
1377                 t.numDHTAnnounces++
1378                 cl.unlock()
1379                 err := t.announceToDht(true, s)
1380                 if err != nil {
1381                         t.logger.Printf("error announcing %q to DHT: %s", t, err)
1382                 }
1383         }
1384 }
1385
1386 func (t *Torrent) addPeers(peers []Peer) {
1387         for _, p := range peers {
1388                 t.addPeer(p)
1389         }
1390 }
1391
1392 func (t *Torrent) Stats() TorrentStats {
1393         t.cl.rLock()
1394         defer t.cl.rUnlock()
1395         return t.statsLocked()
1396 }
1397
1398 func (t *Torrent) statsLocked() (ret TorrentStats) {
1399         ret.ActivePeers = len(t.conns)
1400         ret.HalfOpenPeers = len(t.halfOpen)
1401         ret.PendingPeers = t.peers.Len()
1402         ret.TotalPeers = t.numTotalPeers()
1403         ret.ConnectedSeeders = 0
1404         for c := range t.conns {
1405                 if all, ok := c.peerHasAllPieces(); all && ok {
1406                         ret.ConnectedSeeders++
1407                 }
1408         }
1409         ret.ConnStats = t.stats.Copy()
1410         return
1411 }
1412
1413 // The total number of peers in the torrent.
1414 func (t *Torrent) numTotalPeers() int {
1415         peers := make(map[string]struct{})
1416         for conn := range t.conns {
1417                 ra := conn.conn.RemoteAddr()
1418                 if ra == nil {
1419                         // It's been closed and doesn't support RemoteAddr.
1420                         continue
1421                 }
1422                 peers[ra.String()] = struct{}{}
1423         }
1424         for addr := range t.halfOpen {
1425                 peers[addr] = struct{}{}
1426         }
1427         t.peers.Each(func(peer Peer) {
1428                 peers[fmt.Sprintf("%s:%d", peer.IP, peer.Port)] = struct{}{}
1429         })
1430         return len(peers)
1431 }
1432
1433 // Reconcile bytes transferred before connection was associated with a
1434 // torrent.
1435 func (t *Torrent) reconcileHandshakeStats(c *connection) {
1436         if c.stats != (ConnStats{
1437                 // Handshakes should only increment these fields:
1438                 BytesWritten: c.stats.BytesWritten,
1439                 BytesRead:    c.stats.BytesRead,
1440         }) {
1441                 panic("bad stats")
1442         }
1443         c.postHandshakeStats(func(cs *ConnStats) {
1444                 cs.BytesRead.Add(c.stats.BytesRead.Int64())
1445                 cs.BytesWritten.Add(c.stats.BytesWritten.Int64())
1446         })
1447         c.reconciledHandshakeStats = true
1448 }
1449
1450 // Returns true if the connection is added.
1451 func (t *Torrent) addConnection(c *connection) (err error) {
1452         defer func() {
1453                 if err == nil {
1454                         torrent.Add("added connections", 1)
1455                 }
1456         }()
1457         if t.closed.IsSet() {
1458                 return errors.New("torrent closed")
1459         }
1460         for c0 := range t.conns {
1461                 if c.PeerID != c0.PeerID {
1462                         continue
1463                 }
1464                 if !t.cl.config.dropDuplicatePeerIds {
1465                         continue
1466                 }
1467                 if left, ok := c.hasPreferredNetworkOver(c0); ok && left {
1468                         c0.Close()
1469                         t.deleteConnection(c0)
1470                 } else {
1471                         return errors.New("existing connection preferred")
1472                 }
1473         }
1474         if len(t.conns) >= t.maxEstablishedConns {
1475                 c := t.worstBadConn()
1476                 if c == nil {
1477                         return errors.New("don't want conns")
1478                 }
1479                 c.Close()
1480                 t.deleteConnection(c)
1481         }
1482         if len(t.conns) >= t.maxEstablishedConns {
1483                 panic(len(t.conns))
1484         }
1485         t.conns[c] = struct{}{}
1486         return nil
1487 }
1488
1489 func (t *Torrent) wantConns() bool {
1490         if !t.networkingEnabled {
1491                 return false
1492         }
1493         if t.closed.IsSet() {
1494                 return false
1495         }
1496         if !t.seeding() && !t.needData() {
1497                 return false
1498         }
1499         if len(t.conns) < t.maxEstablishedConns {
1500                 return true
1501         }
1502         return t.worstBadConn() != nil
1503 }
1504
1505 func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
1506         t.cl.lock()
1507         defer t.cl.unlock()
1508         oldMax = t.maxEstablishedConns
1509         t.maxEstablishedConns = max
1510         wcs := slices.HeapInterface(slices.FromMapKeys(t.conns), worseConn)
1511         for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 {
1512                 t.dropConnection(wcs.Pop().(*connection))
1513         }
1514         t.openNewConns()
1515         return oldMax
1516 }
1517
1518 func (t *Torrent) pieceHashed(piece pieceIndex, correct bool) {
1519         t.logger.Log(log.Fstr("hashed piece %d (passed=%t)", piece, correct).WithValues(debugLogValue))
1520         p := t.piece(piece)
1521         p.numVerifies++
1522         t.cl.event.Broadcast()
1523         if t.closed.IsSet() {
1524                 return
1525         }
1526         touchers := t.reapPieceTouchers(piece)
1527         if p.storageCompletionOk {
1528                 // Don't score the first time a piece is hashed, it could be an
1529                 // initial check.
1530                 if correct {
1531                         pieceHashedCorrect.Add(1)
1532                 } else {
1533                         log.Fmsg("piece %d failed hash: %d connections contributed", piece, len(touchers)).AddValues(t, p).Log(t.logger)
1534                         pieceHashedNotCorrect.Add(1)
1535                 }
1536         }
1537         if correct {
1538                 if len(touchers) != 0 {
1539                         // Don't increment stats above connection-level for every involved
1540                         // connection.
1541                         t.allStats((*ConnStats).incrementPiecesDirtiedGood)
1542                 }
1543                 for _, c := range touchers {
1544                         c.stats.incrementPiecesDirtiedGood()
1545                 }
1546                 err := p.Storage().MarkComplete()
1547                 if err != nil {
1548                         t.logger.Printf("%T: error marking piece complete %d: %s", t.storage, piece, err)
1549                 }
1550         } else {
1551                 if len(touchers) != 0 {
1552                         // Don't increment stats above connection-level for every involved connection.
1553                         t.allStats((*ConnStats).incrementPiecesDirtiedBad)
1554                         for _, c := range touchers {
1555                                 // Y u do dis peer?!
1556                                 c.stats.incrementPiecesDirtiedBad()
1557                         }
1558                         slices.Sort(touchers, connLessTrusted)
1559                         if t.cl.config.Debug {
1560                                 t.logger.Printf("conns by trust for piece %d: %v",
1561                                         piece,
1562                                         func() (ret []connectionTrust) {
1563                                                 for _, c := range touchers {
1564                                                         ret = append(ret, c.trust())
1565                                                 }
1566                                                 return
1567                                         }())
1568                         }
1569                         c := touchers[0]
1570                         if !c.trust().Implicit {
1571                                 t.cl.banPeerIP(c.remoteAddr.IP)
1572                                 c.Drop()
1573                         }
1574                 }
1575                 t.onIncompletePiece(piece)
1576                 p.Storage().MarkNotComplete()
1577         }
1578         t.updatePieceCompletion(piece)
1579 }
1580
1581 func (t *Torrent) cancelRequestsForPiece(piece pieceIndex) {
1582         // TODO: Make faster
1583         for cn := range t.conns {
1584                 cn.tickleWriter()
1585         }
1586 }
1587
1588 func (t *Torrent) onPieceCompleted(piece pieceIndex) {
1589         t.pendAllChunkSpecs(piece)
1590         t.cancelRequestsForPiece(piece)
1591         for conn := range t.conns {
1592                 conn.Have(piece)
1593         }
1594 }
1595
1596 // Called when a piece is found to be not complete.
1597 func (t *Torrent) onIncompletePiece(piece pieceIndex) {
1598         if t.pieceAllDirty(piece) {
1599                 t.pendAllChunkSpecs(piece)
1600         }
1601         if !t.wantPieceIndex(piece) {
1602                 // t.logger.Printf("piece %d incomplete and unwanted", piece)
1603                 return
1604         }
1605         // We could drop any connections that we told we have a piece that we
1606         // don't here. But there's a test failure, and it seems clients don't care
1607         // if you request pieces that you already claim to have. Pruning bad
1608         // connections might just remove any connections that aren't treating us
1609         // favourably anyway.
1610
1611         // for c := range t.conns {
1612         //      if c.sentHave(piece) {
1613         //              c.Drop()
1614         //      }
1615         // }
1616         for conn := range t.conns {
1617                 if conn.PeerHasPiece(piece) {
1618                         conn.updateRequests()
1619                 }
1620         }
1621 }
1622
1623 func (t *Torrent) tryCreateMorePieceHashers() {
1624         for t.activePieceHashes < 2 && t.tryCreatePieceHasher() {
1625         }
1626 }
1627
1628 func (t *Torrent) tryCreatePieceHasher() bool {
1629         if t.storage == nil {
1630                 return false
1631         }
1632         pi, ok := t.getPieceToHash()
1633         if !ok {
1634                 return false
1635         }
1636         p := t.piece(pi)
1637         t.piecesQueuedForHash.Remove(pi)
1638         p.hashing = true
1639         t.publishPieceChange(pi)
1640         t.updatePiecePriority(pi)
1641         t.storageLock.RLock()
1642         t.activePieceHashes++
1643         go t.pieceHasher(pi)
1644         return true
1645 }
1646
1647 func (t *Torrent) getPieceToHash() (ret pieceIndex, ok bool) {
1648         t.piecesQueuedForHash.IterTyped(func(i pieceIndex) bool {
1649                 if t.piece(i).hashing {
1650                         return true
1651                 }
1652                 ret = i
1653                 ok = true
1654                 return false
1655         })
1656         return
1657 }
1658
1659 func (t *Torrent) pieceHasher(index pieceIndex) {
1660         p := t.piece(index)
1661         sum := t.hashPiece(index)
1662         t.storageLock.RUnlock()
1663         t.cl.lock()
1664         defer t.cl.unlock()
1665         p.hashing = false
1666         t.updatePiecePriority(index)
1667         t.pieceHashed(index, sum == *p.hash)
1668         t.publishPieceChange(index)
1669         t.activePieceHashes--
1670         t.tryCreateMorePieceHashers()
1671 }
1672
1673 // Return the connections that touched a piece, and clear the entries while
1674 // doing it.
1675 func (t *Torrent) reapPieceTouchers(piece pieceIndex) (ret []*connection) {
1676         for c := range t.pieces[piece].dirtiers {
1677                 delete(c.peerTouchedPieces, piece)
1678                 ret = append(ret, c)
1679         }
1680         t.pieces[piece].dirtiers = nil
1681         return
1682 }
1683
1684 func (t *Torrent) connsAsSlice() (ret []*connection) {
1685         for c := range t.conns {
1686                 ret = append(ret, c)
1687         }
1688         return
1689 }
1690
1691 func (t *Torrent) queuePieceCheck(pieceIndex pieceIndex) {
1692         piece := t.piece(pieceIndex)
1693         if piece.queuedForHash() {
1694                 return
1695         }
1696         t.piecesQueuedForHash.Add(bitmap.BitIndex(pieceIndex))
1697         t.publishPieceChange(pieceIndex)
1698         t.updatePiecePriority(pieceIndex)
1699         t.tryCreateMorePieceHashers()
1700 }
1701
1702 // Forces all the pieces to be re-hashed. See also Piece.VerifyData. This should not be called
1703 // before the Info is available.
1704 func (t *Torrent) VerifyData() {
1705         for i := pieceIndex(0); i < t.NumPieces(); i++ {
1706                 t.Piece(i).VerifyData()
1707         }
1708 }
1709
1710 // Start the process of connecting to the given peer for the given torrent if appropriate.
1711 func (t *Torrent) initiateConn(peer Peer) {
1712         if peer.Id == t.cl.peerID {
1713                 return
1714         }
1715         if t.cl.badPeerIPPort(peer.IP, peer.Port) && !peer.Trusted {
1716                 return
1717         }
1718         addr := IpPort{peer.IP, uint16(peer.Port)}
1719         if t.addrActive(addr.String()) {
1720                 return
1721         }
1722         t.halfOpen[addr.String()] = peer
1723         go t.cl.outgoingConnection(t, addr, peer.Source, peer.Trusted)
1724 }
1725
1726 // Adds each a trusted, pending peer for each of the Client's addresses.
1727 func (t *Torrent) AddClientPeer(cl *Client) {
1728         t.AddPeers(func() (ps []Peer) {
1729                 for _, la := range cl.ListenAddrs() {
1730                         ps = append(ps, Peer{
1731                                 IP:      missinggo.AddrIP(la),
1732                                 Port:    missinggo.AddrPort(la),
1733                                 Trusted: true,
1734                         })
1735                 }
1736                 return
1737         }())
1738 }
1739
1740 // All stats that include this Torrent. Useful when we want to increment
1741 // ConnStats but not for every connection.
1742 func (t *Torrent) allStats(f func(*ConnStats)) {
1743         f(&t.stats)
1744         f(&t.cl.stats)
1745 }
1746
1747 func (t *Torrent) hashingPiece(i pieceIndex) bool {
1748         return t.pieces[i].hashing
1749 }
1750
1751 func (t *Torrent) pieceQueuedForHash(i pieceIndex) bool {
1752         return t.piecesQueuedForHash.Get(bitmap.BitIndex(i))
1753 }
1754
1755 func (t *Torrent) dialTimeout() time.Duration {
1756         return reducedDialTimeout(t.cl.config.MinDialTimeout, t.cl.config.NominalDialTimeout, t.cl.config.HalfOpenConnsPerTorrent, t.peers.Len())
1757 }
1758
1759 func (t *Torrent) piece(i int) *Piece {
1760         return &t.pieces[i]
1761 }