]> Sergey Matveev's repositories - btrtrc.git/blob - torrent.go
5fd2bbff3df2ef6a365459333bc188814e777a13
[btrtrc.git] / torrent.go
1 package torrent
2
3 import (
4         "container/heap"
5         "crypto/sha1"
6         "errors"
7         "fmt"
8         "io"
9         "math/rand"
10         "net/url"
11         "os"
12         "sync"
13         "text/tabwriter"
14         "time"
15         "unsafe"
16
17         "github.com/davecgh/go-spew/spew"
18
19         "github.com/anacrolix/dht/v2"
20         "github.com/anacrolix/log"
21         "github.com/anacrolix/missinggo"
22         "github.com/anacrolix/missinggo/bitmap"
23         "github.com/anacrolix/missinggo/perf"
24         "github.com/anacrolix/missinggo/prioritybitmap"
25         "github.com/anacrolix/missinggo/pubsub"
26         "github.com/anacrolix/missinggo/slices"
27
28         "github.com/anacrolix/torrent/bencode"
29         "github.com/anacrolix/torrent/metainfo"
30         pp "github.com/anacrolix/torrent/peer_protocol"
31         "github.com/anacrolix/torrent/storage"
32         "github.com/anacrolix/torrent/tracker"
33 )
34
35 func (t *Torrent) chunkIndexSpec(chunkIndex pp.Integer, piece pieceIndex) chunkSpec {
36         return chunkIndexSpec(chunkIndex, t.pieceLength(piece), t.chunkSize)
37 }
38
39 // Maintains state of torrent within a Client.
40 type Torrent struct {
41         // Torrent-level aggregate statistics. First in struct to ensure 64-bit
42         // alignment. See #262.
43         stats  ConnStats
44         cl     *Client
45         logger log.Logger
46
47         networkingEnabled bool
48
49         // Determines what chunks to request from peers. 1: Favour higher priority
50         // pieces with some fuzzing to reduce overlaps and wastage across
51         // connections. 2: The fastest connection downloads strictly in order of
52         // priority, while all others adher to their piece inclications. 3:
53         // Requests are strictly by piece priority, and not duplicated until
54         // duplicateRequestTimeout is reached.
55         requestStrategy int
56         // How long to avoid duplicating a pending request.
57         duplicateRequestTimeout time.Duration
58
59         closed   missinggo.Event
60         infoHash metainfo.Hash
61         pieces   []Piece
62         // Values are the piece indices that changed.
63         pieceStateChanges *pubsub.PubSub
64         // The size of chunks to request from peers over the wire. This is
65         // normally 16KiB by convention these days.
66         chunkSize pp.Integer
67         chunkPool *sync.Pool
68         // Total length of the torrent in bytes. Stored because it's not O(1) to
69         // get this from the info dict.
70         length *int64
71
72         // The storage to open when the info dict becomes available.
73         storageOpener *storage.Client
74         // Storage for torrent data.
75         storage *storage.Torrent
76         // Read-locked for using storage, and write-locked for Closing.
77         storageLock sync.RWMutex
78
79         // TODO: Only announce stuff is used?
80         metainfo metainfo.MetaInfo
81
82         // The info dict. nil if we don't have it (yet).
83         info  *metainfo.Info
84         files *[]*File
85
86         // Active peer connections, running message stream loops. TODO: Make this
87         // open (not-closed) connections only.
88         conns               map[*connection]struct{}
89         maxEstablishedConns int
90         // Set of addrs to which we're attempting to connect. Connections are
91         // half-open until all handshakes are completed.
92         halfOpen    map[string]Peer
93         fastestConn *connection
94
95         // Reserve of peers to connect to. A peer can be both here and in the
96         // active connections if were told about the peer after connecting with
97         // them. That encourages us to reconnect to peers that are well known in
98         // the swarm.
99         peers          prioritizedPeers
100         wantPeersEvent missinggo.Event
101         // An announcer for each tracker URL.
102         trackerAnnouncers map[string]*trackerScraper
103         // How many times we've initiated a DHT announce. TODO: Move into stats.
104         numDHTAnnounces int
105
106         // Name used if the info name isn't available. Should be cleared when the
107         // Info does become available.
108         nameMu      sync.RWMutex
109         displayName string
110
111         // The bencoded bytes of the info dict. This is actively manipulated if
112         // the info bytes aren't initially available, and we try to fetch them
113         // from peers.
114         metadataBytes []byte
115         // Each element corresponds to the 16KiB metadata pieces. If true, we have
116         // received that piece.
117         metadataCompletedChunks []bool
118         metadataChanged         sync.Cond
119
120         // Set when .Info is obtained.
121         gotMetainfo missinggo.Event
122
123         readers               map[*reader]struct{}
124         readerNowPieces       bitmap.Bitmap
125         readerReadaheadPieces bitmap.Bitmap
126
127         // A cache of pieces we need to get. Calculated from various piece and
128         // file priorities and completion states elsewhere.
129         pendingPieces prioritybitmap.PriorityBitmap
130         // A cache of completed piece indices.
131         completedPieces bitmap.Bitmap
132         // Pieces that need to be hashed.
133         piecesQueuedForHash bitmap.Bitmap
134
135         // A pool of piece priorities []int for assignment to new connections.
136         // These "inclinations" are used to give connections preference for
137         // different pieces.
138         connPieceInclinationPool sync.Pool
139
140         // Count of each request across active connections.
141         pendingRequests map[request]int
142         // The last time we requested a chunk. Deleting the request from any
143         // connection will clear this value.
144         lastRequested map[request]*time.Timer
145 }
146
147 func (t *Torrent) tickleReaders() {
148         t.cl.event.Broadcast()
149 }
150
151 // Returns a channel that is closed when the Torrent is closed.
152 func (t *Torrent) Closed() <-chan struct{} {
153         return t.closed.LockedChan(t.cl.locker())
154 }
155
156 // KnownSwarm returns the known subset of the peers in the Torrent's swarm, including active,
157 // pending, and half-open peers.
158 func (t *Torrent) KnownSwarm() (ks []Peer) {
159         // Add pending peers to the list
160         t.peers.Each(func(peer Peer) {
161                 ks = append(ks, peer)
162         })
163
164         // Add half-open peers to the list
165         for _, peer := range t.halfOpen {
166                 ks = append(ks, peer)
167         }
168
169         // Add active peers to the list
170         for conn := range t.conns {
171
172                 ks = append(ks, Peer{
173                         Id:     conn.PeerID,
174                         IP:     conn.remoteAddr.IP,
175                         Port:   int(conn.remoteAddr.Port),
176                         Source: conn.Discovery,
177                         // > If the connection is encrypted, that's certainly enough to set SupportsEncryption.
178                         // > But if we're not connected to them with an encrypted connection, I couldn't say
179                         // > what's appropriate. We can carry forward the SupportsEncryption value as we
180                         // > received it from trackers/DHT/PEX, or just use the encryption state for the
181                         // > connection. It's probably easiest to do the latter for now.
182                         // https://github.com/anacrolix/torrent/pull/188
183                         SupportsEncryption: conn.headerEncrypted,
184                 })
185         }
186
187         return
188 }
189
190 func (t *Torrent) setChunkSize(size pp.Integer) {
191         t.chunkSize = size
192         t.chunkPool = &sync.Pool{
193                 New: func() interface{} {
194                         b := make([]byte, size)
195                         return &b
196                 },
197         }
198 }
199
200 func (t *Torrent) pieceComplete(piece pieceIndex) bool {
201         return t.completedPieces.Get(bitmap.BitIndex(piece))
202 }
203
204 func (t *Torrent) pieceCompleteUncached(piece pieceIndex) storage.Completion {
205         return t.pieces[piece].Storage().Completion()
206 }
207
208 // There's a connection to that address already.
209 func (t *Torrent) addrActive(addr string) bool {
210         if _, ok := t.halfOpen[addr]; ok {
211                 return true
212         }
213         for c := range t.conns {
214                 ra := c.remoteAddr
215                 if ra.String() == addr {
216                         return true
217                 }
218         }
219         return false
220 }
221
222 func (t *Torrent) unclosedConnsAsSlice() (ret []*connection) {
223         ret = make([]*connection, 0, len(t.conns))
224         for c := range t.conns {
225                 if !c.closed.IsSet() {
226                         ret = append(ret, c)
227                 }
228         }
229         return
230 }
231
232 func (t *Torrent) addPeer(p Peer) {
233         cl := t.cl
234         peersAddedBySource.Add(string(p.Source), 1)
235         if t.closed.IsSet() {
236                 return
237         }
238         if cl.badPeerIPPort(p.IP, p.Port) {
239                 torrent.Add("peers not added because of bad addr", 1)
240                 return
241         }
242         if t.peers.Add(p) {
243                 torrent.Add("peers replaced", 1)
244         }
245         t.openNewConns()
246         for t.peers.Len() > cl.config.TorrentPeersHighWater {
247                 _, ok := t.peers.DeleteMin()
248                 if ok {
249                         torrent.Add("excess reserve peers discarded", 1)
250                 }
251         }
252 }
253
254 func (t *Torrent) invalidateMetadata() {
255         for i := range t.metadataCompletedChunks {
256                 t.metadataCompletedChunks[i] = false
257         }
258         t.nameMu.Lock()
259         t.info = nil
260         t.nameMu.Unlock()
261 }
262
263 func (t *Torrent) saveMetadataPiece(index int, data []byte) {
264         if t.haveInfo() {
265                 return
266         }
267         if index >= len(t.metadataCompletedChunks) {
268                 t.logger.Printf("%s: ignoring metadata piece %d", t, index)
269                 return
270         }
271         copy(t.metadataBytes[(1<<14)*index:], data)
272         t.metadataCompletedChunks[index] = true
273 }
274
275 func (t *Torrent) metadataPieceCount() int {
276         return (len(t.metadataBytes) + (1 << 14) - 1) / (1 << 14)
277 }
278
279 func (t *Torrent) haveMetadataPiece(piece int) bool {
280         if t.haveInfo() {
281                 return (1<<14)*piece < len(t.metadataBytes)
282         } else {
283                 return piece < len(t.metadataCompletedChunks) && t.metadataCompletedChunks[piece]
284         }
285 }
286
287 func (t *Torrent) metadataSize() int {
288         return len(t.metadataBytes)
289 }
290
291 func infoPieceHashes(info *metainfo.Info) (ret [][]byte) {
292         for i := 0; i < len(info.Pieces); i += sha1.Size {
293                 ret = append(ret, info.Pieces[i:i+sha1.Size])
294         }
295         return
296 }
297
298 func (t *Torrent) makePieces() {
299         hashes := infoPieceHashes(t.info)
300         t.pieces = make([]Piece, len(hashes), len(hashes))
301         for i, hash := range hashes {
302                 piece := &t.pieces[i]
303                 piece.t = t
304                 piece.index = pieceIndex(i)
305                 piece.noPendingWrites.L = &piece.pendingWritesMutex
306                 piece.hash = (*metainfo.Hash)(unsafe.Pointer(&hash[0]))
307                 files := *t.files
308                 beginFile := pieceFirstFileIndex(piece.torrentBeginOffset(), files)
309                 endFile := pieceEndFileIndex(piece.torrentEndOffset(), files)
310                 piece.files = files[beginFile:endFile]
311         }
312 }
313
314 // Returns the index of the first file containing the piece. files must be
315 // ordered by offset.
316 func pieceFirstFileIndex(pieceOffset int64, files []*File) int {
317         for i, f := range files {
318                 if f.offset+f.length > pieceOffset {
319                         return i
320                 }
321         }
322         return 0
323 }
324
325 // Returns the index after the last file containing the piece. files must be
326 // ordered by offset.
327 func pieceEndFileIndex(pieceEndOffset int64, files []*File) int {
328         for i, f := range files {
329                 if f.offset+f.length >= pieceEndOffset {
330                         return i + 1
331                 }
332         }
333         return 0
334 }
335
336 func (t *Torrent) cacheLength() {
337         var l int64
338         for _, f := range t.info.UpvertedFiles() {
339                 l += f.Length
340         }
341         t.length = &l
342 }
343
344 func (t *Torrent) setInfo(info *metainfo.Info) error {
345         if err := validateInfo(info); err != nil {
346                 return fmt.Errorf("bad info: %s", err)
347         }
348         if t.storageOpener != nil {
349                 var err error
350                 t.storage, err = t.storageOpener.OpenTorrent(info, t.infoHash)
351                 if err != nil {
352                         return fmt.Errorf("error opening torrent storage: %s", err)
353                 }
354         }
355         t.nameMu.Lock()
356         t.info = info
357         t.nameMu.Unlock()
358         t.displayName = "" // Save a few bytes lol.
359         t.initFiles()
360         t.cacheLength()
361         t.makePieces()
362         return nil
363 }
364
365 func (t *Torrent) onSetInfo() {
366         for conn := range t.conns {
367                 if err := conn.setNumPieces(t.numPieces()); err != nil {
368                         t.logger.Printf("closing connection: %s", err)
369                         conn.Close()
370                 }
371         }
372         for i := range t.pieces {
373                 t.updatePieceCompletion(pieceIndex(i))
374                 p := &t.pieces[i]
375                 if !p.storageCompletionOk {
376                         // t.logger.Printf("piece %s completion unknown, queueing check", p)
377                         t.queuePieceCheck(pieceIndex(i))
378                 }
379         }
380         t.cl.event.Broadcast()
381         t.gotMetainfo.Set()
382         t.updateWantPeersEvent()
383         t.pendingRequests = make(map[request]int)
384         t.lastRequested = make(map[request]*time.Timer)
385 }
386
387 // Called when metadata for a torrent becomes available.
388 func (t *Torrent) setInfoBytes(b []byte) error {
389         if metainfo.HashBytes(b) != t.infoHash {
390                 return errors.New("info bytes have wrong hash")
391         }
392         var info metainfo.Info
393         if err := bencode.Unmarshal(b, &info); err != nil {
394                 return fmt.Errorf("error unmarshalling info bytes: %s", err)
395         }
396         if err := t.setInfo(&info); err != nil {
397                 return err
398         }
399         t.metadataBytes = b
400         t.metadataCompletedChunks = nil
401         t.onSetInfo()
402         return nil
403 }
404
405 func (t *Torrent) haveAllMetadataPieces() bool {
406         if t.haveInfo() {
407                 return true
408         }
409         if t.metadataCompletedChunks == nil {
410                 return false
411         }
412         for _, have := range t.metadataCompletedChunks {
413                 if !have {
414                         return false
415                 }
416         }
417         return true
418 }
419
420 // TODO: Propagate errors to disconnect peer.
421 func (t *Torrent) setMetadataSize(bytes int) (err error) {
422         if t.haveInfo() {
423                 // We already know the correct metadata size.
424                 return
425         }
426         if bytes <= 0 || bytes > 10000000 { // 10MB, pulled from my ass.
427                 return errors.New("bad size")
428         }
429         if t.metadataBytes != nil && len(t.metadataBytes) == int(bytes) {
430                 return
431         }
432         t.metadataBytes = make([]byte, bytes)
433         t.metadataCompletedChunks = make([]bool, (bytes+(1<<14)-1)/(1<<14))
434         t.metadataChanged.Broadcast()
435         for c := range t.conns {
436                 c.requestPendingMetadata()
437         }
438         return
439 }
440
441 // The current working name for the torrent. Either the name in the info dict,
442 // or a display name given such as by the dn value in a magnet link, or "".
443 func (t *Torrent) name() string {
444         t.nameMu.RLock()
445         defer t.nameMu.RUnlock()
446         if t.haveInfo() {
447                 return t.info.Name
448         }
449         return t.displayName
450 }
451
452 func (t *Torrent) pieceState(index pieceIndex) (ret PieceState) {
453         p := &t.pieces[index]
454         ret.Priority = t.piecePriority(index)
455         ret.Completion = p.completion()
456         if p.queuedForHash() || p.hashing {
457                 ret.Checking = true
458         }
459         if !ret.Complete && t.piecePartiallyDownloaded(index) {
460                 ret.Partial = true
461         }
462         return
463 }
464
465 func (t *Torrent) metadataPieceSize(piece int) int {
466         return metadataPieceSize(len(t.metadataBytes), piece)
467 }
468
469 func (t *Torrent) newMetadataExtensionMessage(c *connection, msgType int, piece int, data []byte) pp.Message {
470         d := map[string]int{
471                 "msg_type": msgType,
472                 "piece":    piece,
473         }
474         if data != nil {
475                 d["total_size"] = len(t.metadataBytes)
476         }
477         p := bencode.MustMarshal(d)
478         return pp.Message{
479                 Type:            pp.Extended,
480                 ExtendedID:      c.PeerExtensionIDs[pp.ExtensionNameMetadata],
481                 ExtendedPayload: append(p, data...),
482         }
483 }
484
485 func (t *Torrent) pieceStateRuns() (ret []PieceStateRun) {
486         rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
487                 ret = append(ret, PieceStateRun{
488                         PieceState: el.(PieceState),
489                         Length:     int(count),
490                 })
491         })
492         for index := range t.pieces {
493                 rle.Append(t.pieceState(pieceIndex(index)), 1)
494         }
495         rle.Flush()
496         return
497 }
498
499 // Produces a small string representing a PieceStateRun.
500 func pieceStateRunStatusChars(psr PieceStateRun) (ret string) {
501         ret = fmt.Sprintf("%d", psr.Length)
502         ret += func() string {
503                 switch psr.Priority {
504                 case PiecePriorityNext:
505                         return "N"
506                 case PiecePriorityNormal:
507                         return "."
508                 case PiecePriorityReadahead:
509                         return "R"
510                 case PiecePriorityNow:
511                         return "!"
512                 case PiecePriorityHigh:
513                         return "H"
514                 default:
515                         return ""
516                 }
517         }()
518         if psr.Checking {
519                 ret += "H"
520         }
521         if psr.Partial {
522                 ret += "P"
523         }
524         if psr.Complete {
525                 ret += "C"
526         }
527         if !psr.Ok {
528                 ret += "?"
529         }
530         return
531 }
532
533 func (t *Torrent) writeStatus(w io.Writer) {
534         fmt.Fprintf(w, "Infohash: %s\n", t.infoHash.HexString())
535         fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
536         if !t.haveInfo() {
537                 fmt.Fprintf(w, "Metadata have: ")
538                 for _, h := range t.metadataCompletedChunks {
539                         fmt.Fprintf(w, "%c", func() rune {
540                                 if h {
541                                         return 'H'
542                                 } else {
543                                         return '.'
544                                 }
545                         }())
546                 }
547                 fmt.Fprintln(w)
548         }
549         fmt.Fprintf(w, "Piece length: %s\n", func() string {
550                 if t.haveInfo() {
551                         return fmt.Sprint(t.usualPieceSize())
552                 } else {
553                         return "?"
554                 }
555         }())
556         if t.info != nil {
557                 fmt.Fprintf(w, "Num Pieces: %d (%d completed)\n", t.numPieces(), t.numPiecesCompleted())
558                 fmt.Fprint(w, "Piece States:")
559                 for _, psr := range t.pieceStateRuns() {
560                         w.Write([]byte(" "))
561                         w.Write([]byte(pieceStateRunStatusChars(psr)))
562                 }
563                 fmt.Fprintln(w)
564         }
565         fmt.Fprintf(w, "Reader Pieces:")
566         t.forReaderOffsetPieces(func(begin, end pieceIndex) (again bool) {
567                 fmt.Fprintf(w, " %d:%d", begin, end)
568                 return true
569         })
570         fmt.Fprintln(w)
571
572         fmt.Fprintf(w, "Enabled trackers:\n")
573         func() {
574                 tw := tabwriter.NewWriter(w, 0, 0, 2, ' ', 0)
575                 fmt.Fprintf(tw, "    URL\tNext announce\tLast announce\n")
576                 for _, ta := range slices.Sort(slices.FromMapElems(t.trackerAnnouncers), func(l, r *trackerScraper) bool {
577                         return l.u.String() < r.u.String()
578                 }).([]*trackerScraper) {
579                         fmt.Fprintf(tw, "    %s\n", ta.statusLine())
580                 }
581                 tw.Flush()
582         }()
583
584         fmt.Fprintf(w, "DHT Announces: %d\n", t.numDHTAnnounces)
585
586         spew.NewDefaultConfig()
587         spew.Fdump(w, t.statsLocked())
588
589         conns := t.connsAsSlice()
590         slices.Sort(conns, worseConn)
591         for i, c := range conns {
592                 fmt.Fprintf(w, "%2d. ", i+1)
593                 c.WriteStatus(w, t)
594         }
595 }
596
597 func (t *Torrent) haveInfo() bool {
598         return t.info != nil
599 }
600
601 // Returns a run-time generated MetaInfo that includes the info bytes and
602 // announce-list as currently known to the client.
603 func (t *Torrent) newMetaInfo() metainfo.MetaInfo {
604         return metainfo.MetaInfo{
605                 CreationDate: time.Now().Unix(),
606                 Comment:      "dynamic metainfo from client",
607                 CreatedBy:    "go.torrent",
608                 AnnounceList: t.metainfo.UpvertedAnnounceList(),
609                 InfoBytes: func() []byte {
610                         if t.haveInfo() {
611                                 return t.metadataBytes
612                         } else {
613                                 return nil
614                         }
615                 }(),
616         }
617 }
618
619 func (t *Torrent) BytesMissing() int64 {
620         t.cl.rLock()
621         defer t.cl.rUnlock()
622         return t.bytesMissingLocked()
623 }
624
625 func (t *Torrent) bytesMissingLocked() int64 {
626         return t.bytesLeft()
627 }
628
629 func (t *Torrent) bytesLeft() (left int64) {
630         bitmap.Flip(t.completedPieces, 0, bitmap.BitIndex(t.numPieces())).IterTyped(func(piece int) bool {
631                 p := &t.pieces[piece]
632                 left += int64(p.length() - p.numDirtyBytes())
633                 return true
634         })
635         return
636 }
637
638 // Bytes left to give in tracker announces.
639 func (t *Torrent) bytesLeftAnnounce() int64 {
640         if t.haveInfo() {
641                 return t.bytesLeft()
642         } else {
643                 return -1
644         }
645 }
646
647 func (t *Torrent) piecePartiallyDownloaded(piece pieceIndex) bool {
648         if t.pieceComplete(piece) {
649                 return false
650         }
651         if t.pieceAllDirty(piece) {
652                 return false
653         }
654         return t.pieces[piece].hasDirtyChunks()
655 }
656
657 func (t *Torrent) usualPieceSize() int {
658         return int(t.info.PieceLength)
659 }
660
661 func (t *Torrent) numPieces() pieceIndex {
662         return pieceIndex(t.info.NumPieces())
663 }
664
665 func (t *Torrent) numPiecesCompleted() (num int) {
666         return t.completedPieces.Len()
667 }
668
669 func (t *Torrent) close() (err error) {
670         t.closed.Set()
671         t.tickleReaders()
672         if t.storage != nil {
673                 t.storageLock.Lock()
674                 t.storage.Close()
675                 t.storageLock.Unlock()
676         }
677         for conn := range t.conns {
678                 conn.Close()
679         }
680         t.cl.event.Broadcast()
681         t.pieceStateChanges.Close()
682         t.updateWantPeersEvent()
683         return
684 }
685
686 func (t *Torrent) requestOffset(r request) int64 {
687         return torrentRequestOffset(*t.length, int64(t.usualPieceSize()), r)
688 }
689
690 // Return the request that would include the given offset into the torrent
691 // data. Returns !ok if there is no such request.
692 func (t *Torrent) offsetRequest(off int64) (req request, ok bool) {
693         return torrentOffsetRequest(*t.length, t.info.PieceLength, int64(t.chunkSize), off)
694 }
695
696 func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
697         defer perf.ScopeTimerErr(&err)()
698         n, err := t.pieces[piece].Storage().WriteAt(data, begin)
699         if err == nil && n != len(data) {
700                 err = io.ErrShortWrite
701         }
702         return
703 }
704
705 func (t *Torrent) bitfield() (bf []bool) {
706         bf = make([]bool, t.numPieces())
707         t.completedPieces.IterTyped(func(piece int) (again bool) {
708                 bf[piece] = true
709                 return true
710         })
711         return
712 }
713
714 func (t *Torrent) pieceNumChunks(piece pieceIndex) pp.Integer {
715         return (t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize
716 }
717
718 func (t *Torrent) pendAllChunkSpecs(pieceIndex pieceIndex) {
719         t.pieces[pieceIndex].dirtyChunks.Clear()
720 }
721
722 func (t *Torrent) pieceLength(piece pieceIndex) pp.Integer {
723         if t.info.PieceLength == 0 {
724                 // There will be no variance amongst pieces. Only pain.
725                 return 0
726         }
727         if piece == t.numPieces()-1 {
728                 ret := pp.Integer(*t.length % t.info.PieceLength)
729                 if ret != 0 {
730                         return ret
731                 }
732         }
733         return pp.Integer(t.info.PieceLength)
734 }
735
736 func (t *Torrent) hashPiece(piece pieceIndex) (ret metainfo.Hash) {
737         hash := pieceHash.New()
738         p := &t.pieces[piece]
739         p.waitNoPendingWrites()
740         ip := t.info.Piece(int(piece))
741         pl := ip.Length()
742         n, err := io.Copy(hash, io.NewSectionReader(t.pieces[piece].Storage(), 0, pl))
743         if n == pl {
744                 missinggo.CopyExact(&ret, hash.Sum(nil))
745                 return
746         }
747         if err != io.ErrUnexpectedEOF && !os.IsNotExist(err) {
748                 t.logger.Printf("unexpected error hashing piece %d through %T: %s", piece, t.storage.TorrentImpl, err)
749         }
750         return
751 }
752
753 func (t *Torrent) haveAnyPieces() bool {
754         return t.completedPieces.Len() != 0
755 }
756
757 func (t *Torrent) haveAllPieces() bool {
758         if !t.haveInfo() {
759                 return false
760         }
761         return t.completedPieces.Len() == bitmap.BitIndex(t.numPieces())
762 }
763
764 func (t *Torrent) havePiece(index pieceIndex) bool {
765         return t.haveInfo() && t.pieceComplete(index)
766 }
767
768 func (t *Torrent) haveChunk(r request) (ret bool) {
769         // defer func() {
770         //      log.Println("have chunk", r, ret)
771         // }()
772         if !t.haveInfo() {
773                 return false
774         }
775         if t.pieceComplete(pieceIndex(r.Index)) {
776                 return true
777         }
778         p := &t.pieces[r.Index]
779         return !p.pendingChunk(r.chunkSpec, t.chunkSize)
780 }
781
782 func chunkIndex(cs chunkSpec, chunkSize pp.Integer) int {
783         return int(cs.Begin / chunkSize)
784 }
785
786 func (t *Torrent) wantPieceIndex(index pieceIndex) bool {
787         if !t.haveInfo() {
788                 return false
789         }
790         if index < 0 || index >= t.numPieces() {
791                 return false
792         }
793         p := &t.pieces[index]
794         if p.queuedForHash() {
795                 return false
796         }
797         if p.hashing {
798                 return false
799         }
800         if t.pieceComplete(index) {
801                 return false
802         }
803         if t.pendingPieces.Contains(bitmap.BitIndex(index)) {
804                 return true
805         }
806         // t.logger.Printf("piece %d not pending", index)
807         return !t.forReaderOffsetPieces(func(begin, end pieceIndex) bool {
808                 return index < begin || index >= end
809         })
810 }
811
812 // The worst connection is one that hasn't been sent, or sent anything useful
813 // for the longest. A bad connection is one that usually sends us unwanted
814 // pieces, or has been in worser half of the established connections for more
815 // than a minute.
816 func (t *Torrent) worstBadConn() *connection {
817         wcs := worseConnSlice{t.unclosedConnsAsSlice()}
818         heap.Init(&wcs)
819         for wcs.Len() != 0 {
820                 c := heap.Pop(&wcs).(*connection)
821                 if c.stats.ChunksReadWasted.Int64() >= 6 && c.stats.ChunksReadWasted.Int64() > c.stats.ChunksReadUseful.Int64() {
822                         return c
823                 }
824                 // If the connection is in the worst half of the established
825                 // connection quota and is older than a minute.
826                 if wcs.Len() >= (t.maxEstablishedConns+1)/2 {
827                         // Give connections 1 minute to prove themselves.
828                         if time.Since(c.completedHandshake) > time.Minute {
829                                 return c
830                         }
831                 }
832         }
833         return nil
834 }
835
836 type PieceStateChange struct {
837         Index int
838         PieceState
839 }
840
841 func (t *Torrent) publishPieceChange(piece pieceIndex) {
842         cur := t.pieceState(piece)
843         p := &t.pieces[piece]
844         if cur != p.publicPieceState {
845                 p.publicPieceState = cur
846                 t.pieceStateChanges.Publish(PieceStateChange{
847                         int(piece),
848                         cur,
849                 })
850         }
851 }
852
853 func (t *Torrent) pieceNumPendingChunks(piece pieceIndex) pp.Integer {
854         if t.pieceComplete(piece) {
855                 return 0
856         }
857         return t.pieceNumChunks(piece) - t.pieces[piece].numDirtyChunks()
858 }
859
860 func (t *Torrent) pieceAllDirty(piece pieceIndex) bool {
861         return t.pieces[piece].dirtyChunks.Len() == int(t.pieceNumChunks(piece))
862 }
863
864 func (t *Torrent) readersChanged() {
865         t.updateReaderPieces()
866         t.updateAllPiecePriorities()
867 }
868
869 func (t *Torrent) updateReaderPieces() {
870         t.readerNowPieces, t.readerReadaheadPieces = t.readerPiecePriorities()
871 }
872
873 func (t *Torrent) readerPosChanged(from, to pieceRange) {
874         if from == to {
875                 return
876         }
877         t.updateReaderPieces()
878         // Order the ranges, high and low.
879         l, h := from, to
880         if l.begin > h.begin {
881                 l, h = h, l
882         }
883         if l.end < h.begin {
884                 // Two distinct ranges.
885                 t.updatePiecePriorities(l.begin, l.end)
886                 t.updatePiecePriorities(h.begin, h.end)
887         } else {
888                 // Ranges overlap.
889                 end := l.end
890                 if h.end > end {
891                         end = h.end
892                 }
893                 t.updatePiecePriorities(l.begin, end)
894         }
895 }
896
897 func (t *Torrent) maybeNewConns() {
898         // Tickle the accept routine.
899         t.cl.event.Broadcast()
900         t.openNewConns()
901 }
902
903 func (t *Torrent) piecePriorityChanged(piece pieceIndex) {
904         // t.logger.Printf("piece %d priority changed", piece)
905         for c := range t.conns {
906                 if c.updatePiecePriority(piece) {
907                         // log.Print("conn piece priority changed")
908                         c.updateRequests()
909                 }
910         }
911         t.maybeNewConns()
912         t.publishPieceChange(piece)
913 }
914
915 func (t *Torrent) updatePiecePriority(piece pieceIndex) {
916         p := &t.pieces[piece]
917         newPrio := p.uncachedPriority()
918         // t.logger.Printf("torrent %p: piece %d: uncached priority: %v", t, piece, newPrio)
919         if newPrio == PiecePriorityNone {
920                 if !t.pendingPieces.Remove(bitmap.BitIndex(piece)) {
921                         return
922                 }
923         } else {
924                 if !t.pendingPieces.Set(bitmap.BitIndex(piece), newPrio.BitmapPriority()) {
925                         return
926                 }
927         }
928         t.piecePriorityChanged(piece)
929 }
930
931 func (t *Torrent) updateAllPiecePriorities() {
932         t.updatePiecePriorities(0, t.numPieces())
933 }
934
935 // Update all piece priorities in one hit. This function should have the same
936 // output as updatePiecePriority, but across all pieces.
937 func (t *Torrent) updatePiecePriorities(begin, end pieceIndex) {
938         for i := begin; i < end; i++ {
939                 t.updatePiecePriority(i)
940         }
941 }
942
943 // Returns the range of pieces [begin, end) that contains the extent of bytes.
944 func (t *Torrent) byteRegionPieces(off, size int64) (begin, end pieceIndex) {
945         if off >= *t.length {
946                 return
947         }
948         if off < 0 {
949                 size += off
950                 off = 0
951         }
952         if size <= 0 {
953                 return
954         }
955         begin = pieceIndex(off / t.info.PieceLength)
956         end = pieceIndex((off + size + t.info.PieceLength - 1) / t.info.PieceLength)
957         if end > pieceIndex(t.info.NumPieces()) {
958                 end = pieceIndex(t.info.NumPieces())
959         }
960         return
961 }
962
963 // Returns true if all iterations complete without breaking. Returns the read
964 // regions for all readers. The reader regions should not be merged as some
965 // callers depend on this method to enumerate readers.
966 func (t *Torrent) forReaderOffsetPieces(f func(begin, end pieceIndex) (more bool)) (all bool) {
967         for r := range t.readers {
968                 p := r.pieces
969                 if p.begin >= p.end {
970                         continue
971                 }
972                 if !f(p.begin, p.end) {
973                         return false
974                 }
975         }
976         return true
977 }
978
979 func (t *Torrent) piecePriority(piece pieceIndex) piecePriority {
980         prio, ok := t.pendingPieces.GetPriority(bitmap.BitIndex(piece))
981         if !ok {
982                 return PiecePriorityNone
983         }
984         if prio > 0 {
985                 panic(prio)
986         }
987         ret := piecePriority(-prio)
988         if ret == PiecePriorityNone {
989                 panic(piece)
990         }
991         return ret
992 }
993
994 func (t *Torrent) pendRequest(req request) {
995         ci := chunkIndex(req.chunkSpec, t.chunkSize)
996         t.pieces[req.Index].pendChunkIndex(ci)
997 }
998
999 func (t *Torrent) pieceCompletionChanged(piece pieceIndex) {
1000         log.Call().Add("piece", piece).AddValue(debugLogValue).Log(t.logger)
1001         t.cl.event.Broadcast()
1002         if t.pieceComplete(piece) {
1003                 t.onPieceCompleted(piece)
1004         } else {
1005                 t.onIncompletePiece(piece)
1006         }
1007         t.updatePiecePriority(piece)
1008 }
1009
1010 func (t *Torrent) numReceivedConns() (ret int) {
1011         for c := range t.conns {
1012                 if c.Discovery == peerSourceIncoming {
1013                         ret++
1014                 }
1015         }
1016         return
1017 }
1018
1019 func (t *Torrent) maxHalfOpen() int {
1020         // Note that if we somehow exceed the maximum established conns, we want
1021         // the negative value to have an effect.
1022         establishedHeadroom := int64(t.maxEstablishedConns - len(t.conns))
1023         extraIncoming := int64(t.numReceivedConns() - t.maxEstablishedConns/2)
1024         // We want to allow some experimentation with new peers, and to try to
1025         // upset an oversupply of received connections.
1026         return int(min(max(5, extraIncoming)+establishedHeadroom, int64(t.cl.config.HalfOpenConnsPerTorrent)))
1027 }
1028
1029 func (t *Torrent) openNewConns() {
1030         defer t.updateWantPeersEvent()
1031         for t.peers.Len() != 0 {
1032                 if !t.wantConns() {
1033                         return
1034                 }
1035                 if len(t.halfOpen) >= t.maxHalfOpen() {
1036                         return
1037                 }
1038                 p := t.peers.PopMax()
1039                 t.initiateConn(p)
1040         }
1041 }
1042
1043 func (t *Torrent) getConnPieceInclination() []int {
1044         _ret := t.connPieceInclinationPool.Get()
1045         if _ret == nil {
1046                 pieceInclinationsNew.Add(1)
1047                 return rand.Perm(int(t.numPieces()))
1048         }
1049         pieceInclinationsReused.Add(1)
1050         return *_ret.(*[]int)
1051 }
1052
1053 func (t *Torrent) putPieceInclination(pi []int) {
1054         t.connPieceInclinationPool.Put(&pi)
1055         pieceInclinationsPut.Add(1)
1056 }
1057
1058 func (t *Torrent) updatePieceCompletion(piece pieceIndex) bool {
1059         pcu := t.pieceCompleteUncached(piece)
1060         p := &t.pieces[piece]
1061         changed := t.completedPieces.Get(bitmap.BitIndex(piece)) != pcu.Complete || p.storageCompletionOk != pcu.Ok
1062         log.Fmsg("piece %d completion: %v", piece, pcu.Ok).AddValue(debugLogValue).Log(t.logger)
1063         p.storageCompletionOk = pcu.Ok
1064         t.completedPieces.Set(bitmap.BitIndex(piece), pcu.Complete)
1065         t.tickleReaders()
1066         // t.logger.Printf("piece %d uncached completion: %v", piece, pcu.Complete)
1067         // t.logger.Printf("piece %d changed: %v", piece, changed)
1068         if changed {
1069                 t.pieceCompletionChanged(piece)
1070         }
1071         return changed
1072 }
1073
1074 // Non-blocking read. Client lock is not required.
1075 func (t *Torrent) readAt(b []byte, off int64) (n int, err error) {
1076         p := &t.pieces[off/t.info.PieceLength]
1077         p.waitNoPendingWrites()
1078         return p.Storage().ReadAt(b, off-p.Info().Offset())
1079 }
1080
1081 func (t *Torrent) updateAllPieceCompletions() {
1082         for i := pieceIndex(0); i < t.numPieces(); i++ {
1083                 t.updatePieceCompletion(i)
1084         }
1085 }
1086
1087 // Returns an error if the metadata was completed, but couldn't be set for
1088 // some reason. Blame it on the last peer to contribute.
1089 func (t *Torrent) maybeCompleteMetadata() error {
1090         if t.haveInfo() {
1091                 // Nothing to do.
1092                 return nil
1093         }
1094         if !t.haveAllMetadataPieces() {
1095                 // Don't have enough metadata pieces.
1096                 return nil
1097         }
1098         err := t.setInfoBytes(t.metadataBytes)
1099         if err != nil {
1100                 t.invalidateMetadata()
1101                 return fmt.Errorf("error setting info bytes: %s", err)
1102         }
1103         if t.cl.config.Debug {
1104                 t.logger.Printf("%s: got metadata from peers", t)
1105         }
1106         return nil
1107 }
1108
1109 func (t *Torrent) readerPiecePriorities() (now, readahead bitmap.Bitmap) {
1110         t.forReaderOffsetPieces(func(begin, end pieceIndex) bool {
1111                 if end > begin {
1112                         now.Add(bitmap.BitIndex(begin))
1113                         readahead.AddRange(bitmap.BitIndex(begin)+1, bitmap.BitIndex(end))
1114                 }
1115                 return true
1116         })
1117         return
1118 }
1119
1120 func (t *Torrent) needData() bool {
1121         if t.closed.IsSet() {
1122                 return false
1123         }
1124         if !t.haveInfo() {
1125                 return true
1126         }
1127         return t.pendingPieces.Len() != 0
1128 }
1129
1130 func appendMissingStrings(old, new []string) (ret []string) {
1131         ret = old
1132 new:
1133         for _, n := range new {
1134                 for _, o := range old {
1135                         if o == n {
1136                                 continue new
1137                         }
1138                 }
1139                 ret = append(ret, n)
1140         }
1141         return
1142 }
1143
1144 func appendMissingTrackerTiers(existing [][]string, minNumTiers int) (ret [][]string) {
1145         ret = existing
1146         for minNumTiers > len(ret) {
1147                 ret = append(ret, nil)
1148         }
1149         return
1150 }
1151
1152 func (t *Torrent) addTrackers(announceList [][]string) {
1153         fullAnnounceList := &t.metainfo.AnnounceList
1154         t.metainfo.AnnounceList = appendMissingTrackerTiers(*fullAnnounceList, len(announceList))
1155         for tierIndex, trackerURLs := range announceList {
1156                 (*fullAnnounceList)[tierIndex] = appendMissingStrings((*fullAnnounceList)[tierIndex], trackerURLs)
1157         }
1158         t.startMissingTrackerScrapers()
1159         t.updateWantPeersEvent()
1160 }
1161
1162 // Don't call this before the info is available.
1163 func (t *Torrent) bytesCompleted() int64 {
1164         if !t.haveInfo() {
1165                 return 0
1166         }
1167         return t.info.TotalLength() - t.bytesLeft()
1168 }
1169
1170 func (t *Torrent) SetInfoBytes(b []byte) (err error) {
1171         t.cl.lock()
1172         defer t.cl.unlock()
1173         return t.setInfoBytes(b)
1174 }
1175
1176 // Returns true if connection is removed from torrent.Conns.
1177 func (t *Torrent) deleteConnection(c *connection) (ret bool) {
1178         if !c.closed.IsSet() {
1179                 panic("connection is not closed")
1180                 // There are behaviours prevented by the closed state that will fail
1181                 // if the connection has been deleted.
1182         }
1183         _, ret = t.conns[c]
1184         delete(t.conns, c)
1185         torrent.Add("deleted connections", 1)
1186         c.deleteAllRequests()
1187         if len(t.conns) == 0 {
1188                 t.assertNoPendingRequests()
1189         }
1190         return
1191 }
1192
1193 func (t *Torrent) assertNoPendingRequests() {
1194         if len(t.pendingRequests) != 0 {
1195                 panic(t.pendingRequests)
1196         }
1197         if len(t.lastRequested) != 0 {
1198                 panic(t.lastRequested)
1199         }
1200 }
1201
1202 func (t *Torrent) dropConnection(c *connection) {
1203         t.cl.event.Broadcast()
1204         c.Close()
1205         if t.deleteConnection(c) {
1206                 t.openNewConns()
1207         }
1208 }
1209
1210 func (t *Torrent) wantPeers() bool {
1211         if t.closed.IsSet() {
1212                 return false
1213         }
1214         if t.peers.Len() > t.cl.config.TorrentPeersLowWater {
1215                 return false
1216         }
1217         return t.needData() || t.seeding()
1218 }
1219
1220 func (t *Torrent) updateWantPeersEvent() {
1221         if t.wantPeers() {
1222                 t.wantPeersEvent.Set()
1223         } else {
1224                 t.wantPeersEvent.Clear()
1225         }
1226 }
1227
1228 // Returns whether the client should make effort to seed the torrent.
1229 func (t *Torrent) seeding() bool {
1230         cl := t.cl
1231         if t.closed.IsSet() {
1232                 return false
1233         }
1234         if cl.config.NoUpload {
1235                 return false
1236         }
1237         if !cl.config.Seed {
1238                 return false
1239         }
1240         if cl.config.DisableAggressiveUpload && t.needData() {
1241                 return false
1242         }
1243         return true
1244 }
1245
1246 func (t *Torrent) startScrapingTracker(_url string) {
1247         if _url == "" {
1248                 return
1249         }
1250         u, err := url.Parse(_url)
1251         if err != nil {
1252                 // URLs with a leading '*' appear to be a uTorrent convention to
1253                 // disable trackers.
1254                 if _url[0] != '*' {
1255                         log.Str("error parsing tracker url").AddValues("url", _url).Log(t.logger)
1256                 }
1257                 return
1258         }
1259         if u.Scheme == "udp" {
1260                 u.Scheme = "udp4"
1261                 t.startScrapingTracker(u.String())
1262                 u.Scheme = "udp6"
1263                 t.startScrapingTracker(u.String())
1264                 return
1265         }
1266         if u.Scheme == "udp4" && (t.cl.config.DisableIPv4Peers || t.cl.config.DisableIPv4) {
1267                 return
1268         }
1269         if u.Scheme == "udp6" && t.cl.config.DisableIPv6 {
1270                 return
1271         }
1272         if _, ok := t.trackerAnnouncers[_url]; ok {
1273                 return
1274         }
1275         newAnnouncer := &trackerScraper{
1276                 u: *u,
1277                 t: t,
1278         }
1279         if t.trackerAnnouncers == nil {
1280                 t.trackerAnnouncers = make(map[string]*trackerScraper)
1281         }
1282         t.trackerAnnouncers[_url] = newAnnouncer
1283         go newAnnouncer.Run()
1284 }
1285
1286 // Adds and starts tracker scrapers for tracker URLs that aren't already
1287 // running.
1288 func (t *Torrent) startMissingTrackerScrapers() {
1289         if t.cl.config.DisableTrackers {
1290                 return
1291         }
1292         t.startScrapingTracker(t.metainfo.Announce)
1293         for _, tier := range t.metainfo.AnnounceList {
1294                 for _, url := range tier {
1295                         t.startScrapingTracker(url)
1296                 }
1297         }
1298 }
1299
1300 // Returns an AnnounceRequest with fields filled out to defaults and current
1301 // values.
1302 func (t *Torrent) announceRequest(event tracker.AnnounceEvent) tracker.AnnounceRequest {
1303         // Note that IPAddress is not set. It's set for UDP inside the tracker
1304         // code, since it's dependent on the network in use.
1305         return tracker.AnnounceRequest{
1306                 Event:    event,
1307                 NumWant:  -1,
1308                 Port:     uint16(t.cl.incomingPeerPort()),
1309                 PeerId:   t.cl.peerID,
1310                 InfoHash: t.infoHash,
1311                 Key:      t.cl.announceKey(),
1312
1313                 // The following are vaguely described in BEP 3.
1314
1315                 Left:     t.bytesLeftAnnounce(),
1316                 Uploaded: t.stats.BytesWrittenData.Int64(),
1317                 // There's no mention of wasted or unwanted download in the BEP.
1318                 Downloaded: t.stats.BytesReadUsefulData.Int64(),
1319         }
1320 }
1321
1322 // Adds peers revealed in an announce until the announce ends, or we have
1323 // enough peers.
1324 func (t *Torrent) consumeDhtAnnouncePeers(pvs <-chan dht.PeersValues) {
1325         cl := t.cl
1326         for v := range pvs {
1327                 cl.lock()
1328                 for _, cp := range v.Peers {
1329                         if cp.Port == 0 {
1330                                 // Can't do anything with this.
1331                                 continue
1332                         }
1333                         t.addPeer(Peer{
1334                                 IP:     cp.IP[:],
1335                                 Port:   cp.Port,
1336                                 Source: peerSourceDHTGetPeers,
1337                         })
1338                 }
1339                 cl.unlock()
1340         }
1341 }
1342
1343 func (t *Torrent) announceToDht(impliedPort bool, s *dht.Server) error {
1344         ps, err := s.Announce(t.infoHash, t.cl.incomingPeerPort(), impliedPort)
1345         if err != nil {
1346                 return err
1347         }
1348         go t.consumeDhtAnnouncePeers(ps.Peers)
1349         select {
1350         case <-t.closed.LockedChan(t.cl.locker()):
1351         case <-time.After(5 * time.Minute):
1352         }
1353         ps.Close()
1354         return nil
1355 }
1356
1357 func (t *Torrent) dhtAnnouncer(s *dht.Server) {
1358         cl := t.cl
1359         for {
1360                 select {
1361                 case <-t.closed.LockedChan(cl.locker()):
1362                         return
1363                 case <-t.wantPeersEvent.LockedChan(cl.locker()):
1364                 }
1365                 cl.lock()
1366                 t.numDHTAnnounces++
1367                 cl.unlock()
1368                 err := t.announceToDht(true, s)
1369                 if err != nil {
1370                         t.logger.Printf("error announcing %q to DHT: %s", t, err)
1371                 }
1372         }
1373 }
1374
1375 func (t *Torrent) addPeers(peers []Peer) {
1376         for _, p := range peers {
1377                 t.addPeer(p)
1378         }
1379 }
1380
1381 func (t *Torrent) Stats() TorrentStats {
1382         t.cl.rLock()
1383         defer t.cl.rUnlock()
1384         return t.statsLocked()
1385 }
1386
1387 func (t *Torrent) statsLocked() (ret TorrentStats) {
1388         ret.ActivePeers = len(t.conns)
1389         ret.HalfOpenPeers = len(t.halfOpen)
1390         ret.PendingPeers = t.peers.Len()
1391         ret.TotalPeers = t.numTotalPeers()
1392         ret.ConnectedSeeders = 0
1393         for c := range t.conns {
1394                 if all, ok := c.peerHasAllPieces(); all && ok {
1395                         ret.ConnectedSeeders++
1396                 }
1397         }
1398         ret.ConnStats = t.stats.Copy()
1399         return
1400 }
1401
1402 // The total number of peers in the torrent.
1403 func (t *Torrent) numTotalPeers() int {
1404         peers := make(map[string]struct{})
1405         for conn := range t.conns {
1406                 ra := conn.conn.RemoteAddr()
1407                 if ra == nil {
1408                         // It's been closed and doesn't support RemoteAddr.
1409                         continue
1410                 }
1411                 peers[ra.String()] = struct{}{}
1412         }
1413         for addr := range t.halfOpen {
1414                 peers[addr] = struct{}{}
1415         }
1416         t.peers.Each(func(peer Peer) {
1417                 peers[fmt.Sprintf("%s:%d", peer.IP, peer.Port)] = struct{}{}
1418         })
1419         return len(peers)
1420 }
1421
1422 // Reconcile bytes transferred before connection was associated with a
1423 // torrent.
1424 func (t *Torrent) reconcileHandshakeStats(c *connection) {
1425         if c.stats != (ConnStats{
1426                 // Handshakes should only increment these fields:
1427                 BytesWritten: c.stats.BytesWritten,
1428                 BytesRead:    c.stats.BytesRead,
1429         }) {
1430                 panic("bad stats")
1431         }
1432         c.postHandshakeStats(func(cs *ConnStats) {
1433                 cs.BytesRead.Add(c.stats.BytesRead.Int64())
1434                 cs.BytesWritten.Add(c.stats.BytesWritten.Int64())
1435         })
1436         c.reconciledHandshakeStats = true
1437 }
1438
1439 // Returns true if the connection is added.
1440 func (t *Torrent) addConnection(c *connection) (err error) {
1441         defer func() {
1442                 if err == nil {
1443                         torrent.Add("added connections", 1)
1444                 }
1445         }()
1446         if t.closed.IsSet() {
1447                 return errors.New("torrent closed")
1448         }
1449         for c0 := range t.conns {
1450                 if c.PeerID != c0.PeerID {
1451                         continue
1452                 }
1453                 if !t.cl.config.dropDuplicatePeerIds {
1454                         continue
1455                 }
1456                 if left, ok := c.hasPreferredNetworkOver(c0); ok && left {
1457                         c0.Close()
1458                         t.deleteConnection(c0)
1459                 } else {
1460                         return errors.New("existing connection preferred")
1461                 }
1462         }
1463         if len(t.conns) >= t.maxEstablishedConns {
1464                 c := t.worstBadConn()
1465                 if c == nil {
1466                         return errors.New("don't want conns")
1467                 }
1468                 c.Close()
1469                 t.deleteConnection(c)
1470         }
1471         if len(t.conns) >= t.maxEstablishedConns {
1472                 panic(len(t.conns))
1473         }
1474         t.conns[c] = struct{}{}
1475         return nil
1476 }
1477
1478 func (t *Torrent) wantConns() bool {
1479         if !t.networkingEnabled {
1480                 return false
1481         }
1482         if t.closed.IsSet() {
1483                 return false
1484         }
1485         if !t.seeding() && !t.needData() {
1486                 return false
1487         }
1488         if len(t.conns) < t.maxEstablishedConns {
1489                 return true
1490         }
1491         return t.worstBadConn() != nil
1492 }
1493
1494 func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
1495         t.cl.lock()
1496         defer t.cl.unlock()
1497         oldMax = t.maxEstablishedConns
1498         t.maxEstablishedConns = max
1499         wcs := slices.HeapInterface(slices.FromMapKeys(t.conns), worseConn)
1500         for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 {
1501                 t.dropConnection(wcs.Pop().(*connection))
1502         }
1503         t.openNewConns()
1504         return oldMax
1505 }
1506
1507 func (t *Torrent) pieceHashed(piece pieceIndex, correct bool) {
1508         log.Fmsg("hashed piece %d", piece).Add("piece", piece).Add("passed", correct).AddValue(debugLogValue).Log(t.logger)
1509         if t.closed.IsSet() {
1510                 return
1511         }
1512         p := &t.pieces[piece]
1513         touchers := t.reapPieceTouchers(piece)
1514         if p.storageCompletionOk {
1515                 // Don't score the first time a piece is hashed, it could be an
1516                 // initial check.
1517                 if correct {
1518                         pieceHashedCorrect.Add(1)
1519                 } else {
1520                         log.Fmsg("piece failed hash: %d connections contributed", len(touchers)).AddValues(t, p).Log(t.logger)
1521                         pieceHashedNotCorrect.Add(1)
1522                 }
1523         }
1524         if correct {
1525                 if len(touchers) != 0 {
1526                         // Don't increment stats above connection-level for every involved
1527                         // connection.
1528                         t.allStats((*ConnStats).incrementPiecesDirtiedGood)
1529                 }
1530                 for _, c := range touchers {
1531                         c.stats.incrementPiecesDirtiedGood()
1532                 }
1533                 err := p.Storage().MarkComplete()
1534                 if err != nil {
1535                         t.logger.Printf("%T: error marking piece complete %d: %s", t.storage, piece, err)
1536                 }
1537         } else {
1538                 if len(touchers) != 0 {
1539                         // Don't increment stats above connection-level for every involved
1540                         // connection.
1541                         t.allStats((*ConnStats).incrementPiecesDirtiedBad)
1542                         for _, c := range touchers {
1543                                 // Y u do dis peer?!
1544                                 c.stats.incrementPiecesDirtiedBad()
1545                         }
1546                         slices.Sort(touchers, connLessTrusted)
1547                         if t.cl.config.Debug {
1548                                 t.logger.Printf("dropping first corresponding conn from trust: %v", func() (ret []int64) {
1549                                         for _, c := range touchers {
1550                                                 ret = append(ret, c.netGoodPiecesDirtied())
1551                                         }
1552                                         return
1553                                 }())
1554                         }
1555                         c := touchers[0]
1556                         t.cl.banPeerIP(c.remoteAddr.IP)
1557                         c.Drop()
1558                 }
1559                 t.onIncompletePiece(piece)
1560                 p.Storage().MarkNotComplete()
1561         }
1562         t.updatePieceCompletion(piece)
1563 }
1564
1565 func (t *Torrent) cancelRequestsForPiece(piece pieceIndex) {
1566         // TODO: Make faster
1567         for cn := range t.conns {
1568                 cn.tickleWriter()
1569         }
1570 }
1571
1572 func (t *Torrent) onPieceCompleted(piece pieceIndex) {
1573         t.pendAllChunkSpecs(piece)
1574         t.cancelRequestsForPiece(piece)
1575         for conn := range t.conns {
1576                 conn.Have(piece)
1577         }
1578 }
1579
1580 // Called when a piece is found to be not complete.
1581 func (t *Torrent) onIncompletePiece(piece pieceIndex) {
1582         if t.pieceAllDirty(piece) {
1583                 t.pendAllChunkSpecs(piece)
1584         }
1585         if !t.wantPieceIndex(piece) {
1586                 // t.logger.Printf("piece %d incomplete and unwanted", piece)
1587                 return
1588         }
1589         // We could drop any connections that we told we have a piece that we
1590         // don't here. But there's a test failure, and it seems clients don't care
1591         // if you request pieces that you already claim to have. Pruning bad
1592         // connections might just remove any connections that aren't treating us
1593         // favourably anyway.
1594
1595         // for c := range t.conns {
1596         //      if c.sentHave(piece) {
1597         //              c.Drop()
1598         //      }
1599         // }
1600         for conn := range t.conns {
1601                 if conn.PeerHasPiece(piece) {
1602                         conn.updateRequests()
1603                 }
1604         }
1605 }
1606
1607 func (t *Torrent) verifyPiece(piece pieceIndex) {
1608         cl := t.cl
1609         cl.lock()
1610         defer cl.unlock()
1611         p := &t.pieces[piece]
1612         defer func() {
1613                 p.numVerifies++
1614                 cl.event.Broadcast()
1615         }()
1616         for p.hashing || t.storage == nil {
1617                 cl.event.Wait()
1618         }
1619         if !p.t.piecesQueuedForHash.Remove(bitmap.BitIndex(piece)) {
1620                 panic("piece was not queued")
1621         }
1622         t.updatePiecePriority(piece)
1623         if t.closed.IsSet() {
1624                 return
1625         }
1626         p.hashing = true
1627         t.publishPieceChange(piece)
1628         t.updatePiecePriority(piece)
1629         t.storageLock.RLock()
1630         cl.unlock()
1631         sum := t.hashPiece(piece)
1632         t.storageLock.RUnlock()
1633         cl.lock()
1634         p.hashing = false
1635         t.updatePiecePriority(piece)
1636         t.pieceHashed(piece, sum == *p.hash)
1637         t.publishPieceChange(piece)
1638 }
1639
1640 // Return the connections that touched a piece, and clear the entries while
1641 // doing it.
1642 func (t *Torrent) reapPieceTouchers(piece pieceIndex) (ret []*connection) {
1643         for c := range t.pieces[piece].dirtiers {
1644                 delete(c.peerTouchedPieces, piece)
1645                 ret = append(ret, c)
1646         }
1647         t.pieces[piece].dirtiers = nil
1648         return
1649 }
1650
1651 func (t *Torrent) connsAsSlice() (ret []*connection) {
1652         for c := range t.conns {
1653                 ret = append(ret, c)
1654         }
1655         return
1656 }
1657
1658 // Currently doesn't really queue, but should in the future.
1659 func (t *Torrent) queuePieceCheck(pieceIndex pieceIndex) {
1660         piece := &t.pieces[pieceIndex]
1661         if piece.queuedForHash() {
1662                 return
1663         }
1664         t.piecesQueuedForHash.Add(bitmap.BitIndex(pieceIndex))
1665         t.publishPieceChange(pieceIndex)
1666         t.updatePiecePriority(pieceIndex)
1667         go t.verifyPiece(pieceIndex)
1668 }
1669
1670 // Forces all the pieces to be re-hashed. See also Piece.VerifyData.
1671 func (t *Torrent) VerifyData() {
1672         for i := pieceIndex(0); i < t.NumPieces(); i++ {
1673                 t.Piece(i).VerifyData()
1674         }
1675 }
1676
1677 // Start the process of connecting to the given peer for the given torrent if
1678 // appropriate.
1679 func (t *Torrent) initiateConn(peer Peer) {
1680         if peer.Id == t.cl.peerID {
1681                 return
1682         }
1683         if t.cl.badPeerIPPort(peer.IP, peer.Port) {
1684                 return
1685         }
1686         addr := IpPort{peer.IP, uint16(peer.Port)}
1687         if t.addrActive(addr.String()) {
1688                 return
1689         }
1690         t.halfOpen[addr.String()] = peer
1691         go t.cl.outgoingConnection(t, addr, peer.Source)
1692 }
1693
1694 func (t *Torrent) AddClientPeer(cl *Client) {
1695         t.AddPeers(func() (ps []Peer) {
1696                 for _, la := range cl.ListenAddrs() {
1697                         ps = append(ps, Peer{
1698                                 IP:   missinggo.AddrIP(la),
1699                                 Port: missinggo.AddrPort(la),
1700                         })
1701                 }
1702                 return
1703         }())
1704 }
1705
1706 // All stats that include this Torrent. Useful when we want to increment
1707 // ConnStats but not for every connection.
1708 func (t *Torrent) allStats(f func(*ConnStats)) {
1709         f(&t.stats)
1710         f(&t.cl.stats)
1711 }
1712
1713 func (t *Torrent) hashingPiece(i pieceIndex) bool {
1714         return t.pieces[i].hashing
1715 }
1716
1717 func (t *Torrent) pieceQueuedForHash(i pieceIndex) bool {
1718         return t.piecesQueuedForHash.Get(bitmap.BitIndex(i))
1719 }
1720
1721 func (t *Torrent) dialTimeout() time.Duration {
1722         return reducedDialTimeout(t.cl.config.MinDialTimeout, t.cl.config.NominalDialTimeout, t.cl.config.HalfOpenConnsPerTorrent, t.peers.Len())
1723 }