]> Sergey Matveev's repositories - btrtrc.git/blob - torrent.go
Add lots of new expvars and perf timers
[btrtrc.git] / torrent.go
1 package torrent
2
3 import (
4         "container/heap"
5         "crypto/sha1"
6         "errors"
7         "fmt"
8         "io"
9         "math"
10         "math/rand"
11         "net"
12         "net/url"
13         "os"
14         "strconv"
15         "sync"
16         "text/tabwriter"
17         "time"
18
19         "github.com/anacrolix/dht"
20         "github.com/anacrolix/log"
21         "github.com/anacrolix/missinggo"
22         "github.com/anacrolix/missinggo/bitmap"
23         "github.com/anacrolix/missinggo/perf"
24         "github.com/anacrolix/missinggo/prioritybitmap"
25         "github.com/anacrolix/missinggo/pubsub"
26         "github.com/anacrolix/missinggo/slices"
27         "github.com/bradfitz/iter"
28         "github.com/davecgh/go-spew/spew"
29
30         "github.com/anacrolix/torrent/bencode"
31         "github.com/anacrolix/torrent/metainfo"
32         pp "github.com/anacrolix/torrent/peer_protocol"
33         "github.com/anacrolix/torrent/storage"
34         "github.com/anacrolix/torrent/tracker"
35 )
36
37 func (t *Torrent) chunkIndexSpec(chunkIndex, piece int) chunkSpec {
38         return chunkIndexSpec(chunkIndex, t.pieceLength(piece), t.chunkSize)
39 }
40
41 type peersKey struct {
42         IPBytes string
43         Port    int
44 }
45
46 // Maintains state of torrent within a Client.
47 type Torrent struct {
48         cl     *Client
49         logger *log.Logger
50
51         networkingEnabled bool
52         // Determines what chunks to request from peers. 1: Favour higher priority
53         // pieces with some fuzzing to reduce overlaps and wastage across
54         // connections. 2: The fastest connection downloads strictly in order of
55         // priority, while all others adher to their piece inclications.
56         requestStrategy int
57
58         closed   missinggo.Event
59         infoHash metainfo.Hash
60         pieces   []Piece
61         // Values are the piece indices that changed.
62         pieceStateChanges *pubsub.PubSub
63         // The size of chunks to request from peers over the wire. This is
64         // normally 16KiB by convention these days.
65         chunkSize pp.Integer
66         chunkPool *sync.Pool
67         // Total length of the torrent in bytes. Stored because it's not O(1) to
68         // get this from the info dict.
69         length *int64
70
71         // The storage to open when the info dict becomes available.
72         storageOpener *storage.Client
73         // Storage for torrent data.
74         storage *storage.Torrent
75         // Read-locked for using storage, and write-locked for Closing.
76         storageLock sync.RWMutex
77
78         // TODO: Only announce stuff is used?
79         metainfo metainfo.MetaInfo
80
81         // The info dict. nil if we don't have it (yet).
82         info  *metainfo.Info
83         files *[]*File
84
85         // Active peer connections, running message stream loops. TODO: Make this
86         // open (not-closed) connections only.
87         conns               map[*connection]struct{}
88         maxEstablishedConns int
89         // Set of addrs to which we're attempting to connect. Connections are
90         // half-open until all handshakes are completed.
91         halfOpen    map[string]Peer
92         fastestConn *connection
93
94         // Reserve of peers to connect to. A peer can be both here and in the
95         // active connections if were told about the peer after connecting with
96         // them. That encourages us to reconnect to peers that are well known in
97         // the swarm.
98         peers          prioritizedPeers
99         wantPeersEvent missinggo.Event
100         // An announcer for each tracker URL.
101         trackerAnnouncers map[string]*trackerScraper
102         // How many times we've initiated a DHT announce. TODO: Move into stats.
103         numDHTAnnounces int
104
105         // Name used if the info name isn't available. Should be cleared when the
106         // Info does become available.
107         displayName string
108
109         // The bencoded bytes of the info dict. This is actively manipulated if
110         // the info bytes aren't initially available, and we try to fetch them
111         // from peers.
112         metadataBytes []byte
113         // Each element corresponds to the 16KiB metadata pieces. If true, we have
114         // received that piece.
115         metadataCompletedChunks []bool
116         metadataChanged         sync.Cond
117
118         // Set when .Info is obtained.
119         gotMetainfo missinggo.Event
120
121         readers               map[*reader]struct{}
122         readerNowPieces       bitmap.Bitmap
123         readerReadaheadPieces bitmap.Bitmap
124
125         // A cache of pieces we need to get. Calculated from various piece and
126         // file priorities and completion states elsewhere.
127         pendingPieces prioritybitmap.PriorityBitmap
128         // A cache of completed piece indices.
129         completedPieces bitmap.Bitmap
130         // Pieces that need to be hashed.
131         piecesQueuedForHash bitmap.Bitmap
132
133         // A pool of piece priorities []int for assignment to new connections.
134         // These "inclinations" are used to give connections preference for
135         // different pieces.
136         connPieceInclinationPool sync.Pool
137         // Torrent-level statistics.
138         stats ConnStats
139
140         // Count of each request across active connections.
141         pendingRequests map[request]int
142 }
143
144 func (t *Torrent) tickleReaders() {
145         t.cl.event.Broadcast()
146 }
147
148 // Returns a channel that is closed when the Torrent is closed.
149 func (t *Torrent) Closed() <-chan struct{} {
150         return t.closed.LockedChan(&t.cl.mu)
151 }
152
153 // KnownSwarm returns the known subset of the peers in the Torrent's swarm, including active,
154 // pending, and half-open peers.
155 func (t *Torrent) KnownSwarm() (ks []Peer) {
156         // Add pending peers to the list
157         t.peers.Each(func(peer Peer) {
158                 ks = append(ks, peer)
159         })
160
161         // Add half-open peers to the list
162         for _, peer := range t.halfOpen {
163                 ks = append(ks, peer)
164         }
165
166         // Add active peers to the list
167         for conn := range t.conns {
168                 host, portString, err := net.SplitHostPort(conn.remoteAddr().String())
169                 if err != nil {
170                         panic(err)
171                 }
172
173                 ip := net.ParseIP(host)
174                 port, err := strconv.Atoi(portString)
175                 if err != nil {
176                         panic(err)
177                 }
178
179                 ks = append(ks, Peer{
180                         Id:     conn.PeerID,
181                         IP:     ip,
182                         Port:   port,
183                         Source: conn.Discovery,
184                         // > If the connection is encrypted, that's certainly enough to set SupportsEncryption.
185                         // > But if we're not connected to them with an encrypted connection, I couldn't say
186                         // > what's appropriate. We can carry forward the SupportsEncryption value as we
187                         // > received it from trackers/DHT/PEX, or just use the encryption state for the
188                         // > connection. It's probably easiest to do the latter for now.
189                         // https://github.com/anacrolix/torrent/pull/188
190                         SupportsEncryption: conn.headerEncrypted,
191                 })
192         }
193
194         return
195 }
196
197 func (t *Torrent) setChunkSize(size pp.Integer) {
198         t.chunkSize = size
199         t.chunkPool = &sync.Pool{
200                 New: func() interface{} {
201                         b := make([]byte, size)
202                         return &b
203                 },
204         }
205 }
206
207 func (t *Torrent) setDisplayName(dn string) {
208         if t.haveInfo() {
209                 return
210         }
211         t.displayName = dn
212 }
213
214 func (t *Torrent) pieceComplete(piece int) bool {
215         return t.completedPieces.Get(piece)
216 }
217
218 func (t *Torrent) pieceCompleteUncached(piece int) storage.Completion {
219         return t.pieces[piece].Storage().Completion()
220 }
221
222 // There's a connection to that address already.
223 func (t *Torrent) addrActive(addr string) bool {
224         if _, ok := t.halfOpen[addr]; ok {
225                 return true
226         }
227         for c := range t.conns {
228                 ra := c.remoteAddr()
229                 if ra == nil {
230                         continue
231                 }
232                 if ra.String() == addr {
233                         return true
234                 }
235         }
236         return false
237 }
238
239 func (t *Torrent) unclosedConnsAsSlice() (ret []*connection) {
240         ret = make([]*connection, 0, len(t.conns))
241         for c := range t.conns {
242                 if !c.closed.IsSet() {
243                         ret = append(ret, c)
244                 }
245         }
246         return
247 }
248
249 func (t *Torrent) addPeer(p Peer) {
250         cl := t.cl
251         peersAddedBySource.Add(string(p.Source), 1)
252         if t.closed.IsSet() {
253                 return
254         }
255         if cl.badPeerIPPort(p.IP, p.Port) {
256                 torrent.Add("peers not added because of bad addr", 1)
257                 return
258         }
259         if t.peers.Add(p) {
260                 torrent.Add("peers replaced", 1)
261         }
262         t.openNewConns()
263         for t.peers.Len() > cl.config.TorrentPeersHighWater {
264                 _, ok := t.peers.DeleteMin()
265                 if ok {
266                         torrent.Add("excess reserve peers discarded", 1)
267                 }
268         }
269 }
270
271 func (t *Torrent) invalidateMetadata() {
272         for i := range t.metadataCompletedChunks {
273                 t.metadataCompletedChunks[i] = false
274         }
275         t.info = nil
276 }
277
278 func (t *Torrent) saveMetadataPiece(index int, data []byte) {
279         if t.haveInfo() {
280                 return
281         }
282         if index >= len(t.metadataCompletedChunks) {
283                 log.Printf("%s: ignoring metadata piece %d", t, index)
284                 return
285         }
286         copy(t.metadataBytes[(1<<14)*index:], data)
287         t.metadataCompletedChunks[index] = true
288 }
289
290 func (t *Torrent) metadataPieceCount() int {
291         return (len(t.metadataBytes) + (1 << 14) - 1) / (1 << 14)
292 }
293
294 func (t *Torrent) haveMetadataPiece(piece int) bool {
295         if t.haveInfo() {
296                 return (1<<14)*piece < len(t.metadataBytes)
297         } else {
298                 return piece < len(t.metadataCompletedChunks) && t.metadataCompletedChunks[piece]
299         }
300 }
301
302 func (t *Torrent) metadataSizeKnown() bool {
303         return t.metadataBytes != nil
304 }
305
306 func (t *Torrent) metadataSize() int {
307         return len(t.metadataBytes)
308 }
309
310 func infoPieceHashes(info *metainfo.Info) (ret []string) {
311         for i := 0; i < len(info.Pieces); i += sha1.Size {
312                 ret = append(ret, string(info.Pieces[i:i+sha1.Size]))
313         }
314         return
315 }
316
317 func (t *Torrent) makePieces() {
318         hashes := infoPieceHashes(t.info)
319         t.pieces = make([]Piece, len(hashes))
320         for i, hash := range hashes {
321                 piece := &t.pieces[i]
322                 piece.t = t
323                 piece.index = i
324                 piece.noPendingWrites.L = &piece.pendingWritesMutex
325                 missinggo.CopyExact(piece.hash[:], hash)
326                 files := *t.files
327                 beginFile := pieceFirstFileIndex(piece.torrentBeginOffset(), files)
328                 endFile := pieceEndFileIndex(piece.torrentEndOffset(), files)
329                 piece.files = files[beginFile:endFile]
330         }
331 }
332
333 // Returns the index of the first file containing the piece. files must be
334 // ordered by offset.
335 func pieceFirstFileIndex(pieceOffset int64, files []*File) int {
336         for i, f := range files {
337                 if f.offset+f.length > pieceOffset {
338                         return i
339                 }
340         }
341         return 0
342 }
343
344 // Returns the index after the last file containing the piece. files must be
345 // ordered by offset.
346 func pieceEndFileIndex(pieceEndOffset int64, files []*File) int {
347         for i, f := range files {
348                 if f.offset+f.length >= pieceEndOffset {
349                         return i + 1
350                 }
351         }
352         return 0
353 }
354
355 func (t *Torrent) cacheLength() {
356         var l int64
357         for _, f := range t.info.UpvertedFiles() {
358                 l += f.Length
359         }
360         t.length = &l
361 }
362
363 func (t *Torrent) setInfo(info *metainfo.Info) error {
364         if err := validateInfo(info); err != nil {
365                 return fmt.Errorf("bad info: %s", err)
366         }
367         if t.storageOpener != nil {
368                 var err error
369                 t.storage, err = t.storageOpener.OpenTorrent(info, t.infoHash)
370                 if err != nil {
371                         return fmt.Errorf("error opening torrent storage: %s", err)
372                 }
373         }
374         t.info = info
375         t.displayName = "" // Save a few bytes lol.
376         t.initFiles()
377         t.cacheLength()
378         t.makePieces()
379         return nil
380 }
381
382 func (t *Torrent) onSetInfo() {
383         for conn := range t.conns {
384                 if err := conn.setNumPieces(t.numPieces()); err != nil {
385                         log.Printf("closing connection: %s", err)
386                         conn.Close()
387                 }
388         }
389         for i := range t.pieces {
390                 t.updatePieceCompletion(i)
391                 p := &t.pieces[i]
392                 if !p.storageCompletionOk {
393                         // log.Printf("piece %s completion unknown, queueing check", p)
394                         t.queuePieceCheck(i)
395                 }
396         }
397         t.cl.event.Broadcast()
398         t.gotMetainfo.Set()
399         t.updateWantPeersEvent()
400         t.pendingRequests = make(map[request]int)
401 }
402
403 // Called when metadata for a torrent becomes available.
404 func (t *Torrent) setInfoBytes(b []byte) error {
405         if metainfo.HashBytes(b) != t.infoHash {
406                 return errors.New("info bytes have wrong hash")
407         }
408         var info metainfo.Info
409         if err := bencode.Unmarshal(b, &info); err != nil {
410                 return fmt.Errorf("error unmarshalling info bytes: %s", err)
411         }
412         if err := t.setInfo(&info); err != nil {
413                 return err
414         }
415         t.metadataBytes = b
416         t.metadataCompletedChunks = nil
417         t.onSetInfo()
418         return nil
419 }
420
421 func (t *Torrent) haveAllMetadataPieces() bool {
422         if t.haveInfo() {
423                 return true
424         }
425         if t.metadataCompletedChunks == nil {
426                 return false
427         }
428         for _, have := range t.metadataCompletedChunks {
429                 if !have {
430                         return false
431                 }
432         }
433         return true
434 }
435
436 // TODO: Propagate errors to disconnect peer.
437 func (t *Torrent) setMetadataSize(bytes int64) (err error) {
438         if t.haveInfo() {
439                 // We already know the correct metadata size.
440                 return
441         }
442         if bytes <= 0 || bytes > 10000000 { // 10MB, pulled from my ass.
443                 return errors.New("bad size")
444         }
445         if t.metadataBytes != nil && len(t.metadataBytes) == int(bytes) {
446                 return
447         }
448         t.metadataBytes = make([]byte, bytes)
449         t.metadataCompletedChunks = make([]bool, (bytes+(1<<14)-1)/(1<<14))
450         t.metadataChanged.Broadcast()
451         for c := range t.conns {
452                 c.requestPendingMetadata()
453         }
454         return
455 }
456
457 // The current working name for the torrent. Either the name in the info dict,
458 // or a display name given such as by the dn value in a magnet link, or "".
459 func (t *Torrent) name() string {
460         if t.haveInfo() {
461                 return t.info.Name
462         }
463         return t.displayName
464 }
465
466 func (t *Torrent) pieceState(index int) (ret PieceState) {
467         p := &t.pieces[index]
468         ret.Priority = t.piecePriority(index)
469         ret.Completion = p.completion()
470         if p.queuedForHash() || p.hashing {
471                 ret.Checking = true
472         }
473         if !ret.Complete && t.piecePartiallyDownloaded(index) {
474                 ret.Partial = true
475         }
476         return
477 }
478
479 func (t *Torrent) metadataPieceSize(piece int) int {
480         return metadataPieceSize(len(t.metadataBytes), piece)
481 }
482
483 func (t *Torrent) newMetadataExtensionMessage(c *connection, msgType int, piece int, data []byte) pp.Message {
484         d := map[string]int{
485                 "msg_type": msgType,
486                 "piece":    piece,
487         }
488         if data != nil {
489                 d["total_size"] = len(t.metadataBytes)
490         }
491         p, err := bencode.Marshal(d)
492         if err != nil {
493                 panic(err)
494         }
495         return pp.Message{
496                 Type:            pp.Extended,
497                 ExtendedID:      c.PeerExtensionIDs["ut_metadata"],
498                 ExtendedPayload: append(p, data...),
499         }
500 }
501
502 func (t *Torrent) pieceStateRuns() (ret []PieceStateRun) {
503         rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
504                 ret = append(ret, PieceStateRun{
505                         PieceState: el.(PieceState),
506                         Length:     int(count),
507                 })
508         })
509         for index := range t.pieces {
510                 rle.Append(t.pieceState(index), 1)
511         }
512         rle.Flush()
513         return
514 }
515
516 // Produces a small string representing a PieceStateRun.
517 func pieceStateRunStatusChars(psr PieceStateRun) (ret string) {
518         ret = fmt.Sprintf("%d", psr.Length)
519         ret += func() string {
520                 switch psr.Priority {
521                 case PiecePriorityNext:
522                         return "N"
523                 case PiecePriorityNormal:
524                         return "."
525                 case PiecePriorityReadahead:
526                         return "R"
527                 case PiecePriorityNow:
528                         return "!"
529                 case PiecePriorityHigh:
530                         return "H"
531                 default:
532                         return ""
533                 }
534         }()
535         if psr.Checking {
536                 ret += "H"
537         }
538         if psr.Partial {
539                 ret += "P"
540         }
541         if psr.Complete {
542                 ret += "C"
543         }
544         if !psr.Ok {
545                 ret += "?"
546         }
547         return
548 }
549
550 func (t *Torrent) writeStatus(w io.Writer) {
551         fmt.Fprintf(w, "Infohash: %s\n", t.infoHash.HexString())
552         fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
553         if !t.haveInfo() {
554                 fmt.Fprintf(w, "Metadata have: ")
555                 for _, h := range t.metadataCompletedChunks {
556                         fmt.Fprintf(w, "%c", func() rune {
557                                 if h {
558                                         return 'H'
559                                 } else {
560                                         return '.'
561                                 }
562                         }())
563                 }
564                 fmt.Fprintln(w)
565         }
566         fmt.Fprintf(w, "Piece length: %s\n", func() string {
567                 if t.haveInfo() {
568                         return fmt.Sprint(t.usualPieceSize())
569                 } else {
570                         return "?"
571                 }
572         }())
573         if t.info != nil {
574                 fmt.Fprintf(w, "Num Pieces: %d (%d completed)\n", t.numPieces(), t.numPiecesCompleted())
575                 fmt.Fprint(w, "Piece States:")
576                 for _, psr := range t.pieceStateRuns() {
577                         w.Write([]byte(" "))
578                         w.Write([]byte(pieceStateRunStatusChars(psr)))
579                 }
580                 fmt.Fprintln(w)
581         }
582         fmt.Fprintf(w, "Reader Pieces:")
583         t.forReaderOffsetPieces(func(begin, end int) (again bool) {
584                 fmt.Fprintf(w, " %d:%d", begin, end)
585                 return true
586         })
587         fmt.Fprintln(w)
588
589         fmt.Fprintf(w, "Enabled trackers:\n")
590         func() {
591                 tw := tabwriter.NewWriter(w, 0, 0, 2, ' ', 0)
592                 fmt.Fprintf(tw, "    URL\tNext announce\tLast announce\n")
593                 for _, ta := range slices.Sort(slices.FromMapElems(t.trackerAnnouncers), func(l, r *trackerScraper) bool {
594                         return l.u.String() < r.u.String()
595                 }).([]*trackerScraper) {
596                         fmt.Fprintf(tw, "    %s\n", ta.statusLine())
597                 }
598                 tw.Flush()
599         }()
600
601         fmt.Fprintf(w, "DHT Announces: %d\n", t.numDHTAnnounces)
602
603         spew.NewDefaultConfig()
604         spew.Fdump(w, t.statsLocked())
605
606         conns := t.connsAsSlice()
607         slices.Sort(conns, worseConn)
608         for i, c := range conns {
609                 fmt.Fprintf(w, "%2d. ", i+1)
610                 c.WriteStatus(w, t)
611         }
612 }
613
614 func (t *Torrent) haveInfo() bool {
615         return t.info != nil
616 }
617
618 // Returns a run-time generated MetaInfo that includes the info bytes and
619 // announce-list as currently known to the client.
620 func (t *Torrent) newMetaInfo() metainfo.MetaInfo {
621         return metainfo.MetaInfo{
622                 CreationDate: time.Now().Unix(),
623                 Comment:      "dynamic metainfo from client",
624                 CreatedBy:    "go.torrent",
625                 AnnounceList: t.metainfo.UpvertedAnnounceList(),
626                 InfoBytes: func() []byte {
627                         if t.haveInfo() {
628                                 return t.metadataBytes
629                         } else {
630                                 return nil
631                         }
632                 }(),
633         }
634 }
635
636 func (t *Torrent) BytesMissing() int64 {
637         t.mu().RLock()
638         defer t.mu().RUnlock()
639         return t.bytesMissingLocked()
640 }
641
642 func (t *Torrent) bytesMissingLocked() int64 {
643         return t.bytesLeft()
644 }
645
646 func (t *Torrent) bytesLeft() (left int64) {
647         bitmap.Flip(t.completedPieces, 0, t.numPieces()).IterTyped(func(piece int) bool {
648                 p := &t.pieces[piece]
649                 left += int64(p.length() - p.numDirtyBytes())
650                 return true
651         })
652         return
653 }
654
655 // Bytes left to give in tracker announces.
656 func (t *Torrent) bytesLeftAnnounce() uint64 {
657         if t.haveInfo() {
658                 return uint64(t.bytesLeft())
659         } else {
660                 return math.MaxUint64
661         }
662 }
663
664 func (t *Torrent) piecePartiallyDownloaded(piece int) bool {
665         if t.pieceComplete(piece) {
666                 return false
667         }
668         if t.pieceAllDirty(piece) {
669                 return false
670         }
671         return t.pieces[piece].hasDirtyChunks()
672 }
673
674 func (t *Torrent) usualPieceSize() int {
675         return int(t.info.PieceLength)
676 }
677
678 func (t *Torrent) numPieces() int {
679         return t.info.NumPieces()
680 }
681
682 func (t *Torrent) numPiecesCompleted() (num int) {
683         return t.completedPieces.Len()
684 }
685
686 func (t *Torrent) close() (err error) {
687         t.closed.Set()
688         t.tickleReaders()
689         if t.storage != nil {
690                 t.storageLock.Lock()
691                 t.storage.Close()
692                 t.storageLock.Unlock()
693         }
694         for conn := range t.conns {
695                 conn.Close()
696         }
697         t.cl.event.Broadcast()
698         t.pieceStateChanges.Close()
699         t.updateWantPeersEvent()
700         return
701 }
702
703 func (t *Torrent) requestOffset(r request) int64 {
704         return torrentRequestOffset(*t.length, int64(t.usualPieceSize()), r)
705 }
706
707 // Return the request that would include the given offset into the torrent
708 // data. Returns !ok if there is no such request.
709 func (t *Torrent) offsetRequest(off int64) (req request, ok bool) {
710         return torrentOffsetRequest(*t.length, t.info.PieceLength, int64(t.chunkSize), off)
711 }
712
713 func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
714         tr := perf.NewTimer()
715
716         n, err := t.pieces[piece].Storage().WriteAt(data, begin)
717         if err == nil && n != len(data) {
718                 err = io.ErrShortWrite
719         }
720         if err == nil {
721                 tr.Mark("write chunk")
722         }
723         return
724 }
725
726 func (t *Torrent) bitfield() (bf []bool) {
727         bf = make([]bool, t.numPieces())
728         t.completedPieces.IterTyped(func(piece int) (again bool) {
729                 bf[piece] = true
730                 return true
731         })
732         return
733 }
734
735 func (t *Torrent) pieceNumChunks(piece int) int {
736         return int((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize)
737 }
738
739 func (t *Torrent) pendAllChunkSpecs(pieceIndex int) {
740         t.pieces[pieceIndex].dirtyChunks.Clear()
741 }
742
743 func (t *Torrent) pieceLength(piece int) pp.Integer {
744         if t.info.PieceLength == 0 {
745                 // There will be no variance amongst pieces. Only pain.
746                 return 0
747         }
748         if piece == t.numPieces()-1 {
749                 ret := pp.Integer(*t.length % t.info.PieceLength)
750                 if ret != 0 {
751                         return ret
752                 }
753         }
754         return pp.Integer(t.info.PieceLength)
755 }
756
757 func (t *Torrent) hashPiece(piece int) (ret metainfo.Hash) {
758         hash := pieceHash.New()
759         p := &t.pieces[piece]
760         p.waitNoPendingWrites()
761         ip := t.info.Piece(piece)
762         pl := ip.Length()
763         n, err := io.Copy(hash, io.NewSectionReader(t.pieces[piece].Storage(), 0, pl))
764         if n == pl {
765                 missinggo.CopyExact(&ret, hash.Sum(nil))
766                 return
767         }
768         if err != io.ErrUnexpectedEOF && !os.IsNotExist(err) {
769                 log.Printf("unexpected error hashing piece with %T: %s", t.storage.TorrentImpl, err)
770         }
771         return
772 }
773
774 func (t *Torrent) haveAnyPieces() bool {
775         return t.completedPieces.Len() != 0
776 }
777
778 func (t *Torrent) haveAllPieces() bool {
779         if !t.haveInfo() {
780                 return false
781         }
782         return t.completedPieces.Len() == t.numPieces()
783 }
784
785 func (t *Torrent) havePiece(index int) bool {
786         return t.haveInfo() && t.pieceComplete(index)
787 }
788
789 func (t *Torrent) haveChunk(r request) (ret bool) {
790         // defer func() {
791         //      log.Println("have chunk", r, ret)
792         // }()
793         if !t.haveInfo() {
794                 return false
795         }
796         if t.pieceComplete(int(r.Index)) {
797                 return true
798         }
799         p := &t.pieces[r.Index]
800         return !p.pendingChunk(r.chunkSpec, t.chunkSize)
801 }
802
803 func chunkIndex(cs chunkSpec, chunkSize pp.Integer) int {
804         return int(cs.Begin / chunkSize)
805 }
806
807 func (t *Torrent) wantPiece(r request) bool {
808         if !t.wantPieceIndex(int(r.Index)) {
809                 return false
810         }
811         if t.pieces[r.Index].pendingChunk(r.chunkSpec, t.chunkSize) {
812                 return true
813         }
814         // TODO: What about pieces that were wanted, but aren't now, and aren't
815         // completed either? That used to be done here.
816         return false
817 }
818
819 func (t *Torrent) wantPieceIndex(index int) bool {
820         if !t.haveInfo() {
821                 return false
822         }
823         if index < 0 || index >= t.numPieces() {
824                 return false
825         }
826         p := &t.pieces[index]
827         if p.queuedForHash() {
828                 return false
829         }
830         if p.hashing {
831                 return false
832         }
833         if t.pieceComplete(index) {
834                 return false
835         }
836         if t.pendingPieces.Contains(index) {
837                 return true
838         }
839         // log.Printf("piece %d not pending", index)
840         return !t.forReaderOffsetPieces(func(begin, end int) bool {
841                 return index < begin || index >= end
842         })
843 }
844
845 // The worst connection is one that hasn't been sent, or sent anything useful
846 // for the longest. A bad connection is one that usually sends us unwanted
847 // pieces, or has been in worser half of the established connections for more
848 // than a minute.
849 func (t *Torrent) worstBadConn() *connection {
850         wcs := worseConnSlice{t.unclosedConnsAsSlice()}
851         heap.Init(&wcs)
852         for wcs.Len() != 0 {
853                 c := heap.Pop(&wcs).(*connection)
854                 if c.stats.ChunksReadUnwanted.Int64() >= 6 && c.stats.ChunksReadUnwanted.Int64() > c.stats.ChunksReadUseful.Int64() {
855                         return c
856                 }
857                 // If the connection is in the worst half of the established
858                 // connection quota and is older than a minute.
859                 if wcs.Len() >= (t.maxEstablishedConns+1)/2 {
860                         // Give connections 1 minute to prove themselves.
861                         if time.Since(c.completedHandshake) > time.Minute {
862                                 return c
863                         }
864                 }
865         }
866         return nil
867 }
868
869 type PieceStateChange struct {
870         Index int
871         PieceState
872 }
873
874 func (t *Torrent) publishPieceChange(piece int) {
875         cur := t.pieceState(piece)
876         p := &t.pieces[piece]
877         if cur != p.publicPieceState {
878                 p.publicPieceState = cur
879                 t.pieceStateChanges.Publish(PieceStateChange{
880                         piece,
881                         cur,
882                 })
883         }
884 }
885
886 func (t *Torrent) pieceNumPendingChunks(piece int) int {
887         if t.pieceComplete(piece) {
888                 return 0
889         }
890         return t.pieceNumChunks(piece) - t.pieces[piece].numDirtyChunks()
891 }
892
893 func (t *Torrent) pieceAllDirty(piece int) bool {
894         return t.pieces[piece].dirtyChunks.Len() == t.pieceNumChunks(piece)
895 }
896
897 func (t *Torrent) readersChanged() {
898         t.updateReaderPieces()
899         t.updateAllPiecePriorities()
900 }
901
902 func (t *Torrent) updateReaderPieces() {
903         t.readerNowPieces, t.readerReadaheadPieces = t.readerPiecePriorities()
904 }
905
906 func (t *Torrent) readerPosChanged(from, to pieceRange) {
907         if from == to {
908                 return
909         }
910         t.updateReaderPieces()
911         // Order the ranges, high and low.
912         l, h := from, to
913         if l.begin > h.begin {
914                 l, h = h, l
915         }
916         if l.end < h.begin {
917                 // Two distinct ranges.
918                 t.updatePiecePriorities(l.begin, l.end)
919                 t.updatePiecePriorities(h.begin, h.end)
920         } else {
921                 // Ranges overlap.
922                 end := l.end
923                 if h.end > end {
924                         end = h.end
925                 }
926                 t.updatePiecePriorities(l.begin, end)
927         }
928 }
929
930 func (t *Torrent) maybeNewConns() {
931         // Tickle the accept routine.
932         t.cl.event.Broadcast()
933         t.openNewConns()
934 }
935
936 func (t *Torrent) piecePriorityChanged(piece int) {
937         // log.Printf("piece %d priority changed", piece)
938         for c := range t.conns {
939                 if c.updatePiecePriority(piece) {
940                         // log.Print("conn piece priority changed")
941                         c.updateRequests()
942                 }
943         }
944         t.maybeNewConns()
945         t.publishPieceChange(piece)
946 }
947
948 func (t *Torrent) updatePiecePriority(piece int) {
949         p := &t.pieces[piece]
950         newPrio := p.uncachedPriority()
951         // log.Printf("torrent %p: piece %d: uncached priority: %v", t, piece, newPrio)
952         if newPrio == PiecePriorityNone {
953                 if !t.pendingPieces.Remove(piece) {
954                         return
955                 }
956         } else {
957                 if !t.pendingPieces.Set(piece, newPrio.BitmapPriority()) {
958                         return
959                 }
960         }
961         t.piecePriorityChanged(piece)
962 }
963
964 func (t *Torrent) updateAllPiecePriorities() {
965         t.updatePiecePriorities(0, len(t.pieces))
966 }
967
968 // Update all piece priorities in one hit. This function should have the same
969 // output as updatePiecePriority, but across all pieces.
970 func (t *Torrent) updatePiecePriorities(begin, end int) {
971         for i := begin; i < end; i++ {
972                 t.updatePiecePriority(i)
973         }
974 }
975
976 // Returns the range of pieces [begin, end) that contains the extent of bytes.
977 func (t *Torrent) byteRegionPieces(off, size int64) (begin, end int) {
978         if off >= *t.length {
979                 return
980         }
981         if off < 0 {
982                 size += off
983                 off = 0
984         }
985         if size <= 0 {
986                 return
987         }
988         begin = int(off / t.info.PieceLength)
989         end = int((off + size + t.info.PieceLength - 1) / t.info.PieceLength)
990         if end > t.info.NumPieces() {
991                 end = t.info.NumPieces()
992         }
993         return
994 }
995
996 // Returns true if all iterations complete without breaking. Returns the read
997 // regions for all readers. The reader regions should not be merged as some
998 // callers depend on this method to enumerate readers.
999 func (t *Torrent) forReaderOffsetPieces(f func(begin, end int) (more bool)) (all bool) {
1000         for r := range t.readers {
1001                 p := r.pieces
1002                 if p.begin >= p.end {
1003                         continue
1004                 }
1005                 if !f(p.begin, p.end) {
1006                         return false
1007                 }
1008         }
1009         return true
1010 }
1011
1012 func (t *Torrent) piecePriority(piece int) piecePriority {
1013         prio, ok := t.pendingPieces.GetPriority(piece)
1014         if !ok {
1015                 return PiecePriorityNone
1016         }
1017         if prio > 0 {
1018                 panic(prio)
1019         }
1020         ret := piecePriority(-prio)
1021         if ret == PiecePriorityNone {
1022                 panic(piece)
1023         }
1024         return ret
1025 }
1026
1027 func (t *Torrent) pendRequest(req request) {
1028         ci := chunkIndex(req.chunkSpec, t.chunkSize)
1029         t.pieces[req.Index].pendChunkIndex(ci)
1030 }
1031
1032 func (t *Torrent) pieceCompletionChanged(piece int) {
1033         log.Call().Add("piece", piece).AddValue(debugLogValue).Log(t.logger)
1034         t.cl.event.Broadcast()
1035         if t.pieceComplete(piece) {
1036                 t.onPieceCompleted(piece)
1037         } else {
1038                 t.onIncompletePiece(piece)
1039         }
1040         t.updatePiecePriority(piece)
1041 }
1042
1043 func (t *Torrent) numReceivedConns() (ret int) {
1044         for c := range t.conns {
1045                 if c.Discovery == peerSourceIncoming {
1046                         ret++
1047                 }
1048         }
1049         return
1050 }
1051
1052 func (t *Torrent) maxHalfOpen() int {
1053         // Note that if we somehow exceed the maximum established conns, we want
1054         // the negative value to have an effect.
1055         establishedHeadroom := int64(t.maxEstablishedConns - len(t.conns))
1056         extraIncoming := int64(t.numReceivedConns() - t.maxEstablishedConns/2)
1057         // We want to allow some experimentation with new peers, and to try to
1058         // upset an oversupply of received connections.
1059         return int(min(max(5, extraIncoming)+establishedHeadroom, int64(t.cl.halfOpenLimit)))
1060 }
1061
1062 func (t *Torrent) openNewConns() {
1063         defer t.updateWantPeersEvent()
1064         for t.peers.Len() != 0 {
1065                 if !t.wantConns() {
1066                         return
1067                 }
1068                 if len(t.halfOpen) >= t.maxHalfOpen() {
1069                         return
1070                 }
1071                 p := t.peers.PopMax()
1072                 t.initiateConn(p)
1073         }
1074 }
1075
1076 func (t *Torrent) getConnPieceInclination() []int {
1077         _ret := t.connPieceInclinationPool.Get()
1078         if _ret == nil {
1079                 pieceInclinationsNew.Add(1)
1080                 return rand.Perm(t.numPieces())
1081         }
1082         pieceInclinationsReused.Add(1)
1083         return *_ret.(*[]int)
1084 }
1085
1086 func (t *Torrent) putPieceInclination(pi []int) {
1087         t.connPieceInclinationPool.Put(&pi)
1088         pieceInclinationsPut.Add(1)
1089 }
1090
1091 func (t *Torrent) updatePieceCompletion(piece int) {
1092         pcu := t.pieceCompleteUncached(piece)
1093         p := &t.pieces[piece]
1094         changed := t.completedPieces.Get(piece) != pcu.Complete || p.storageCompletionOk != pcu.Ok
1095         log.Fmsg("piece %d completion: %v", piece, pcu.Ok).AddValue(debugLogValue).Log(t.logger)
1096         p.storageCompletionOk = pcu.Ok
1097         t.completedPieces.Set(piece, pcu.Complete)
1098         t.tickleReaders()
1099         // log.Printf("piece %d uncached completion: %v", piece, pcu.Complete)
1100         // log.Printf("piece %d changed: %v", piece, changed)
1101         if changed {
1102                 t.pieceCompletionChanged(piece)
1103         }
1104 }
1105
1106 // Non-blocking read. Client lock is not required.
1107 func (t *Torrent) readAt(b []byte, off int64) (n int, err error) {
1108         p := &t.pieces[off/t.info.PieceLength]
1109         p.waitNoPendingWrites()
1110         return p.Storage().ReadAt(b, off-p.Info().Offset())
1111 }
1112
1113 func (t *Torrent) updateAllPieceCompletions() {
1114         for i := range iter.N(t.numPieces()) {
1115                 t.updatePieceCompletion(i)
1116         }
1117 }
1118
1119 // Returns an error if the metadata was completed, but couldn't be set for
1120 // some reason. Blame it on the last peer to contribute.
1121 func (t *Torrent) maybeCompleteMetadata() error {
1122         if t.haveInfo() {
1123                 // Nothing to do.
1124                 return nil
1125         }
1126         if !t.haveAllMetadataPieces() {
1127                 // Don't have enough metadata pieces.
1128                 return nil
1129         }
1130         err := t.setInfoBytes(t.metadataBytes)
1131         if err != nil {
1132                 t.invalidateMetadata()
1133                 return fmt.Errorf("error setting info bytes: %s", err)
1134         }
1135         if t.cl.config.Debug {
1136                 log.Printf("%s: got metadata from peers", t)
1137         }
1138         return nil
1139 }
1140
1141 func (t *Torrent) readerPieces() (ret bitmap.Bitmap) {
1142         t.forReaderOffsetPieces(func(begin, end int) bool {
1143                 ret.AddRange(begin, end)
1144                 return true
1145         })
1146         return
1147 }
1148
1149 func (t *Torrent) readerPiecePriorities() (now, readahead bitmap.Bitmap) {
1150         t.forReaderOffsetPieces(func(begin, end int) bool {
1151                 if end > begin {
1152                         now.Add(begin)
1153                         readahead.AddRange(begin+1, end)
1154                 }
1155                 return true
1156         })
1157         return
1158 }
1159
1160 func (t *Torrent) needData() bool {
1161         if t.closed.IsSet() {
1162                 return false
1163         }
1164         if !t.haveInfo() {
1165                 return true
1166         }
1167         return t.pendingPieces.Len() != 0
1168 }
1169
1170 func appendMissingStrings(old, new []string) (ret []string) {
1171         ret = old
1172 new:
1173         for _, n := range new {
1174                 for _, o := range old {
1175                         if o == n {
1176                                 continue new
1177                         }
1178                 }
1179                 ret = append(ret, n)
1180         }
1181         return
1182 }
1183
1184 func appendMissingTrackerTiers(existing [][]string, minNumTiers int) (ret [][]string) {
1185         ret = existing
1186         for minNumTiers > len(ret) {
1187                 ret = append(ret, nil)
1188         }
1189         return
1190 }
1191
1192 func (t *Torrent) addTrackers(announceList [][]string) {
1193         fullAnnounceList := &t.metainfo.AnnounceList
1194         t.metainfo.AnnounceList = appendMissingTrackerTiers(*fullAnnounceList, len(announceList))
1195         for tierIndex, trackerURLs := range announceList {
1196                 (*fullAnnounceList)[tierIndex] = appendMissingStrings((*fullAnnounceList)[tierIndex], trackerURLs)
1197         }
1198         t.startMissingTrackerScrapers()
1199         t.updateWantPeersEvent()
1200 }
1201
1202 // Don't call this before the info is available.
1203 func (t *Torrent) bytesCompleted() int64 {
1204         if !t.haveInfo() {
1205                 return 0
1206         }
1207         return t.info.TotalLength() - t.bytesLeft()
1208 }
1209
1210 func (t *Torrent) SetInfoBytes(b []byte) (err error) {
1211         t.cl.mu.Lock()
1212         defer t.cl.mu.Unlock()
1213         return t.setInfoBytes(b)
1214 }
1215
1216 // Returns true if connection is removed from torrent.Conns.
1217 func (t *Torrent) deleteConnection(c *connection) (ret bool) {
1218         if !c.closed.IsSet() {
1219                 panic("connection is not closed")
1220                 // There are behaviours prevented by the closed state that will fail
1221                 // if the connection has been deleted.
1222         }
1223         _, ret = t.conns[c]
1224         delete(t.conns, c)
1225         torrent.Add("deleted connections", 1)
1226         c.deleteAllRequests()
1227         if len(t.conns) == 0 {
1228                 t.assertNoPendingRequests()
1229         }
1230         return
1231 }
1232
1233 func (t *Torrent) assertNoPendingRequests() {
1234         for _, num := range t.pendingRequests {
1235                 if num != 0 {
1236                         panic(num)
1237                 }
1238         }
1239 }
1240
1241 func (t *Torrent) dropConnection(c *connection) {
1242         t.cl.event.Broadcast()
1243         c.Close()
1244         if t.deleteConnection(c) {
1245                 t.openNewConns()
1246         }
1247 }
1248
1249 func (t *Torrent) wantPeers() bool {
1250         if t.closed.IsSet() {
1251                 return false
1252         }
1253         if t.peers.Len() > t.cl.config.TorrentPeersLowWater {
1254                 return false
1255         }
1256         return t.needData() || t.seeding()
1257 }
1258
1259 func (t *Torrent) updateWantPeersEvent() {
1260         if t.wantPeers() {
1261                 t.wantPeersEvent.Set()
1262         } else {
1263                 t.wantPeersEvent.Clear()
1264         }
1265 }
1266
1267 // Returns whether the client should make effort to seed the torrent.
1268 func (t *Torrent) seeding() bool {
1269         cl := t.cl
1270         if t.closed.IsSet() {
1271                 return false
1272         }
1273         if cl.config.NoUpload {
1274                 return false
1275         }
1276         if !cl.config.Seed {
1277                 return false
1278         }
1279         if cl.config.DisableAggressiveUpload && t.needData() {
1280                 return false
1281         }
1282         return true
1283 }
1284
1285 func (t *Torrent) startScrapingTracker(_url string) {
1286         if _url == "" {
1287                 return
1288         }
1289         u, _ := url.Parse(_url)
1290         if u.Scheme == "udp" {
1291                 u.Scheme = "udp4"
1292                 t.startScrapingTracker(u.String())
1293                 u.Scheme = "udp6"
1294                 t.startScrapingTracker(u.String())
1295                 return
1296         }
1297         if u.Scheme == "udp4" && (t.cl.config.DisableIPv4Peers || t.cl.config.DisableIPv4) {
1298                 return
1299         }
1300         if u.Scheme == "udp6" && t.cl.config.DisableIPv6 {
1301                 return
1302         }
1303         if _, ok := t.trackerAnnouncers[_url]; ok {
1304                 return
1305         }
1306         newAnnouncer := &trackerScraper{
1307                 u: *u,
1308                 t: t,
1309         }
1310         if t.trackerAnnouncers == nil {
1311                 t.trackerAnnouncers = make(map[string]*trackerScraper)
1312         }
1313         t.trackerAnnouncers[_url] = newAnnouncer
1314         go newAnnouncer.Run()
1315 }
1316
1317 // Adds and starts tracker scrapers for tracker URLs that aren't already
1318 // running.
1319 func (t *Torrent) startMissingTrackerScrapers() {
1320         if t.cl.config.DisableTrackers {
1321                 return
1322         }
1323         t.startScrapingTracker(t.metainfo.Announce)
1324         for _, tier := range t.metainfo.AnnounceList {
1325                 for _, url := range tier {
1326                         t.startScrapingTracker(url)
1327                 }
1328         }
1329 }
1330
1331 // Returns an AnnounceRequest with fields filled out to defaults and current
1332 // values.
1333 func (t *Torrent) announceRequest() tracker.AnnounceRequest {
1334         // Note that IPAddress is not set. It's set for UDP inside the tracker
1335         // code, since it's dependent on the network in use.
1336         return tracker.AnnounceRequest{
1337                 Event:    tracker.None,
1338                 NumWant:  -1,
1339                 Port:     uint16(t.cl.incomingPeerPort()),
1340                 PeerId:   t.cl.peerID,
1341                 InfoHash: t.infoHash,
1342                 Key:      t.cl.announceKey(),
1343
1344                 // The following are vaguely described in BEP 3.
1345
1346                 Left:     t.bytesLeftAnnounce(),
1347                 Uploaded: t.stats.BytesWrittenData.Int64(),
1348                 // There's no mention of wasted or unwanted download in the BEP.
1349                 Downloaded: t.stats.BytesReadUsefulData.Int64(),
1350         }
1351 }
1352
1353 // Adds peers revealed in an announce until the announce ends, or we have
1354 // enough peers.
1355 func (t *Torrent) consumeDHTAnnounce(pvs <-chan dht.PeersValues) {
1356         cl := t.cl
1357         // Count all the unique addresses we got during this announce.
1358         allAddrs := make(map[string]struct{})
1359         for {
1360                 select {
1361                 case v, ok := <-pvs:
1362                         if !ok {
1363                                 return
1364                         }
1365                         addPeers := make([]Peer, 0, len(v.Peers))
1366                         for _, cp := range v.Peers {
1367                                 if cp.Port == 0 {
1368                                         // Can't do anything with this.
1369                                         continue
1370                                 }
1371                                 addPeers = append(addPeers, Peer{
1372                                         IP:     cp.IP[:],
1373                                         Port:   cp.Port,
1374                                         Source: peerSourceDHTGetPeers,
1375                                 })
1376                                 key := (&net.UDPAddr{
1377                                         IP:   cp.IP[:],
1378                                         Port: cp.Port,
1379                                 }).String()
1380                                 allAddrs[key] = struct{}{}
1381                         }
1382                         cl.mu.Lock()
1383                         t.addPeers(addPeers)
1384                         numPeers := t.peers.Len()
1385                         cl.mu.Unlock()
1386                         if numPeers >= cl.config.TorrentPeersHighWater {
1387                                 return
1388                         }
1389                 case <-t.closed.LockedChan(&cl.mu):
1390                         return
1391                 }
1392         }
1393 }
1394
1395 func (t *Torrent) announceDHT(impliedPort bool, s *dht.Server) (err error) {
1396         cl := t.cl
1397         ps, err := s.Announce(t.infoHash, cl.incomingPeerPort(), impliedPort)
1398         if err != nil {
1399                 return
1400         }
1401         t.consumeDHTAnnounce(ps.Peers)
1402         ps.Close()
1403         return
1404 }
1405
1406 func (t *Torrent) dhtAnnouncer(s *dht.Server) {
1407         cl := t.cl
1408         for {
1409                 select {
1410                 case <-t.wantPeersEvent.LockedChan(&cl.mu):
1411                 case <-t.closed.LockedChan(&cl.mu):
1412                         return
1413                 }
1414                 err := t.announceDHT(true, s)
1415                 func() {
1416                         cl.mu.Lock()
1417                         defer cl.mu.Unlock()
1418                         if err == nil {
1419                                 t.numDHTAnnounces++
1420                         } else {
1421                                 log.Printf("error announcing %q to DHT: %s", t, err)
1422                         }
1423                 }()
1424                 select {
1425                 case <-t.closed.LockedChan(&cl.mu):
1426                         return
1427                 case <-time.After(5 * time.Minute):
1428                 }
1429         }
1430 }
1431
1432 func (t *Torrent) addPeers(peers []Peer) {
1433         for _, p := range peers {
1434                 t.addPeer(p)
1435         }
1436 }
1437
1438 func (t *Torrent) Stats() TorrentStats {
1439         t.cl.mu.Lock()
1440         defer t.cl.mu.Unlock()
1441         return t.statsLocked()
1442 }
1443
1444 func (t *Torrent) statsLocked() (ret TorrentStats) {
1445         ret.ActivePeers = len(t.conns)
1446         ret.HalfOpenPeers = len(t.halfOpen)
1447         ret.PendingPeers = t.peers.Len()
1448         ret.TotalPeers = t.numTotalPeers()
1449         ret.ConnectedSeeders = 0
1450         for c := range t.conns {
1451                 if all, ok := c.peerHasAllPieces(); all && ok {
1452                         ret.ConnectedSeeders++
1453                 }
1454         }
1455         ret.ConnStats = t.stats.Copy()
1456         return
1457 }
1458
1459 // The total number of peers in the torrent.
1460 func (t *Torrent) numTotalPeers() int {
1461         peers := make(map[string]struct{})
1462         for conn := range t.conns {
1463                 ra := conn.conn.RemoteAddr()
1464                 if ra == nil {
1465                         // It's been closed and doesn't support RemoteAddr.
1466                         continue
1467                 }
1468                 peers[ra.String()] = struct{}{}
1469         }
1470         for addr := range t.halfOpen {
1471                 peers[addr] = struct{}{}
1472         }
1473         t.peers.Each(func(peer Peer) {
1474                 peers[fmt.Sprintf("%s:%d", peer.IP, peer.Port)] = struct{}{}
1475         })
1476         return len(peers)
1477 }
1478
1479 // Reconcile bytes transferred before connection was associated with a
1480 // torrent.
1481 func (t *Torrent) reconcileHandshakeStats(c *connection) {
1482         if c.stats != (ConnStats{
1483                 // Handshakes should only increment these fields:
1484                 BytesWritten: c.stats.BytesWritten,
1485                 BytesRead:    c.stats.BytesRead,
1486         }) {
1487                 panic("bad stats")
1488         }
1489         c.postHandshakeStats(func(cs *ConnStats) {
1490                 cs.BytesRead.Add(c.stats.BytesRead.Int64())
1491                 cs.BytesWritten.Add(c.stats.BytesWritten.Int64())
1492         })
1493         c.reconciledHandshakeStats = true
1494 }
1495
1496 // Returns true if the connection is added.
1497 func (t *Torrent) addConnection(c *connection) (err error) {
1498         defer func() {
1499                 if err == nil {
1500                         torrent.Add("added connections", 1)
1501                 }
1502         }()
1503         if t.closed.IsSet() {
1504                 return errors.New("torrent closed")
1505         }
1506         if !t.wantConns() {
1507                 return errors.New("don't want conns")
1508         }
1509         if len(t.conns) >= t.maxEstablishedConns {
1510                 c := t.worstBadConn()
1511                 if t.cl.config.Debug && missinggo.CryHeard() {
1512                         log.Printf("%s: dropping connection to make room for new one:\n    %v", t, c)
1513                 }
1514                 c.Close()
1515                 t.deleteConnection(c)
1516         }
1517         if len(t.conns) >= t.maxEstablishedConns {
1518                 panic(len(t.conns))
1519         }
1520         t.conns[c] = struct{}{}
1521         return nil
1522 }
1523
1524 func (t *Torrent) wantConns() bool {
1525         if !t.networkingEnabled {
1526                 return false
1527         }
1528         if t.closed.IsSet() {
1529                 return false
1530         }
1531         if !t.seeding() && !t.needData() {
1532                 return false
1533         }
1534         if len(t.conns) < t.maxEstablishedConns {
1535                 return true
1536         }
1537         return t.worstBadConn() != nil
1538 }
1539
1540 func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
1541         t.cl.mu.Lock()
1542         defer t.cl.mu.Unlock()
1543         oldMax = t.maxEstablishedConns
1544         t.maxEstablishedConns = max
1545         wcs := slices.HeapInterface(slices.FromMapKeys(t.conns), worseConn)
1546         for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 {
1547                 t.dropConnection(wcs.Pop().(*connection))
1548         }
1549         t.openNewConns()
1550         return oldMax
1551 }
1552
1553 func (t *Torrent) mu() missinggo.RWLocker {
1554         return &t.cl.mu
1555 }
1556
1557 func (t *Torrent) pieceHashed(piece int, correct bool) {
1558         log.Fmsg("hashed piece %d", piece).Add("piece", piece).Add("passed", correct).AddValue(debugLogValue).Log(t.logger)
1559         if t.closed.IsSet() {
1560                 return
1561         }
1562         p := &t.pieces[piece]
1563         touchers := t.reapPieceTouchers(piece)
1564         if p.storageCompletionOk {
1565                 // Don't score the first time a piece is hashed, it could be an
1566                 // initial check.
1567                 if correct {
1568                         pieceHashedCorrect.Add(1)
1569                 } else {
1570                         log.Printf("%s: piece %d (%s) failed hash: %d connections contributed", t, piece, p.hash, len(touchers))
1571                         pieceHashedNotCorrect.Add(1)
1572                 }
1573         }
1574         if correct {
1575                 if len(touchers) != 0 {
1576                         // Don't increment stats above connection-level for every involved
1577                         // connection.
1578                         t.allStats((*ConnStats).incrementPiecesDirtiedGood)
1579                 }
1580                 for _, c := range touchers {
1581                         c.stats.incrementPiecesDirtiedGood()
1582                 }
1583                 err := p.Storage().MarkComplete()
1584                 if err != nil {
1585                         log.Printf("%T: error marking piece complete %d: %s", t.storage, piece, err)
1586                 }
1587         } else {
1588                 if len(touchers) != 0 {
1589                         // Don't increment stats above connection-level for every involved
1590                         // connection.
1591                         t.allStats((*ConnStats).incrementPiecesDirtiedBad)
1592                         for _, c := range touchers {
1593                                 // Y u do dis peer?!
1594                                 c.stats.incrementPiecesDirtiedBad()
1595                         }
1596                         slices.Sort(touchers, connLessTrusted)
1597                         if t.cl.config.Debug {
1598                                 log.Printf("dropping first corresponding conn from trust: %v", func() (ret []int64) {
1599                                         for _, c := range touchers {
1600                                                 ret = append(ret, c.netGoodPiecesDirtied())
1601                                         }
1602                                         return
1603                                 }())
1604                         }
1605                         c := touchers[0]
1606                         t.cl.banPeerIP(missinggo.AddrIP(c.remoteAddr()))
1607                         c.Drop()
1608                 }
1609                 t.onIncompletePiece(piece)
1610                 p.Storage().MarkNotComplete()
1611         }
1612         t.updatePieceCompletion(piece)
1613 }
1614
1615 func (t *Torrent) cancelRequestsForPiece(piece int) {
1616         // TODO: Make faster
1617         for cn := range t.conns {
1618                 cn.tickleWriter()
1619         }
1620 }
1621
1622 func (t *Torrent) onPieceCompleted(piece int) {
1623         t.pendAllChunkSpecs(piece)
1624         t.cancelRequestsForPiece(piece)
1625         for conn := range t.conns {
1626                 conn.Have(piece)
1627         }
1628 }
1629
1630 // Called when a piece is found to be not complete.
1631 func (t *Torrent) onIncompletePiece(piece int) {
1632         if t.pieceAllDirty(piece) {
1633                 t.pendAllChunkSpecs(piece)
1634         }
1635         if !t.wantPieceIndex(piece) {
1636                 // log.Printf("piece %d incomplete and unwanted", piece)
1637                 return
1638         }
1639         // We could drop any connections that we told we have a piece that we
1640         // don't here. But there's a test failure, and it seems clients don't care
1641         // if you request pieces that you already claim to have. Pruning bad
1642         // connections might just remove any connections that aren't treating us
1643         // favourably anyway.
1644
1645         // for c := range t.conns {
1646         //      if c.sentHave(piece) {
1647         //              c.Drop()
1648         //      }
1649         // }
1650         for conn := range t.conns {
1651                 if conn.PeerHasPiece(piece) {
1652                         conn.updateRequests()
1653                 }
1654         }
1655 }
1656
1657 func (t *Torrent) verifyPiece(piece int) {
1658         cl := t.cl
1659         cl.mu.Lock()
1660         defer cl.mu.Unlock()
1661         p := &t.pieces[piece]
1662         defer func() {
1663                 p.numVerifies++
1664                 cl.event.Broadcast()
1665         }()
1666         for p.hashing || t.storage == nil {
1667                 cl.event.Wait()
1668         }
1669         if !p.t.piecesQueuedForHash.Remove(piece) {
1670                 panic("piece was not queued")
1671         }
1672         if t.closed.IsSet() || t.pieceComplete(piece) {
1673                 t.updatePiecePriority(piece)
1674                 return
1675         }
1676         p.hashing = true
1677         t.publishPieceChange(piece)
1678         t.storageLock.RLock()
1679         cl.mu.Unlock()
1680         sum := t.hashPiece(piece)
1681         t.storageLock.RUnlock()
1682         cl.mu.Lock()
1683         p.hashing = false
1684         t.pieceHashed(piece, sum == p.hash)
1685         t.publishPieceChange(piece)
1686 }
1687
1688 // Return the connections that touched a piece, and clear the entries while
1689 // doing it.
1690 func (t *Torrent) reapPieceTouchers(piece int) (ret []*connection) {
1691         for c := range t.pieces[piece].dirtiers {
1692                 delete(c.peerTouchedPieces, piece)
1693                 ret = append(ret, c)
1694         }
1695         t.pieces[piece].dirtiers = nil
1696         return
1697 }
1698
1699 func (t *Torrent) connsAsSlice() (ret []*connection) {
1700         for c := range t.conns {
1701                 ret = append(ret, c)
1702         }
1703         return
1704 }
1705
1706 // Currently doesn't really queue, but should in the future.
1707 func (t *Torrent) queuePieceCheck(pieceIndex int) {
1708         piece := &t.pieces[pieceIndex]
1709         if piece.queuedForHash() {
1710                 return
1711         }
1712         t.piecesQueuedForHash.Add(pieceIndex)
1713         t.publishPieceChange(pieceIndex)
1714         go t.verifyPiece(pieceIndex)
1715 }
1716
1717 func (t *Torrent) VerifyData() {
1718         for i := range iter.N(t.NumPieces()) {
1719                 t.Piece(i).VerifyData()
1720         }
1721 }
1722
1723 // Start the process of connecting to the given peer for the given torrent if
1724 // appropriate.
1725 func (t *Torrent) initiateConn(peer Peer) {
1726         if peer.Id == t.cl.peerID {
1727                 return
1728         }
1729         if t.cl.badPeerIPPort(peer.IP, peer.Port) {
1730                 return
1731         }
1732         addr := net.JoinHostPort(peer.IP.String(), fmt.Sprintf("%d", peer.Port))
1733         if t.addrActive(addr) {
1734                 return
1735         }
1736         t.halfOpen[addr] = peer
1737         go t.cl.outgoingConnection(t, addr, peer.Source)
1738 }
1739
1740 func (t *Torrent) AddClientPeer(cl *Client) {
1741         t.AddPeers(func() (ps []Peer) {
1742                 for _, la := range cl.ListenAddrs() {
1743                         ps = append(ps, Peer{
1744                                 IP:   missinggo.AddrIP(la),
1745                                 Port: missinggo.AddrPort(la),
1746                         })
1747                 }
1748                 return
1749         }())
1750 }
1751
1752 // All stats that include this Torrent. Useful when we want to increment
1753 // ConnStats but not for every connection.
1754 func (t *Torrent) allStats(f func(*ConnStats)) {
1755         f(&t.stats)
1756         f(&t.cl.stats)
1757 }