17 "github.com/davecgh/go-spew/spew"
19 "github.com/anacrolix/dht/v2"
20 "github.com/anacrolix/log"
21 "github.com/anacrolix/missinggo"
22 "github.com/anacrolix/missinggo/bitmap"
23 "github.com/anacrolix/missinggo/perf"
24 "github.com/anacrolix/missinggo/prioritybitmap"
25 "github.com/anacrolix/missinggo/pubsub"
26 "github.com/anacrolix/missinggo/slices"
28 "github.com/anacrolix/torrent/bencode"
29 "github.com/anacrolix/torrent/metainfo"
30 pp "github.com/anacrolix/torrent/peer_protocol"
31 "github.com/anacrolix/torrent/storage"
32 "github.com/anacrolix/torrent/tracker"
35 func (t *Torrent) chunkIndexSpec(chunkIndex pp.Integer, piece pieceIndex) chunkSpec {
36 return chunkIndexSpec(chunkIndex, t.pieceLength(piece), t.chunkSize)
39 // Maintains state of torrent within a Client.
41 // Torrent-level aggregate statistics. First in struct to ensure 64-bit
42 // alignment. See #262.
47 networkingEnabled bool
49 // Determines what chunks to request from peers. 1: Favour higher priority
50 // pieces with some fuzzing to reduce overlaps and wastage across
51 // connections. 2: The fastest connection downloads strictly in order of
52 // priority, while all others adher to their piece inclications. 3:
53 // Requests are strictly by piece priority, and not duplicated until
54 // duplicateRequestTimeout is reached.
56 // How long to avoid duplicating a pending request.
57 duplicateRequestTimeout time.Duration
59 closed missinggo.Event
60 infoHash metainfo.Hash
62 // Values are the piece indices that changed.
63 pieceStateChanges *pubsub.PubSub
64 // The size of chunks to request from peers over the wire. This is
65 // normally 16KiB by convention these days.
68 // Total length of the torrent in bytes. Stored because it's not O(1) to
69 // get this from the info dict.
72 // The storage to open when the info dict becomes available.
73 storageOpener *storage.Client
74 // Storage for torrent data.
75 storage *storage.Torrent
76 // Read-locked for using storage, and write-locked for Closing.
77 storageLock sync.RWMutex
79 // TODO: Only announce stuff is used?
80 metainfo metainfo.MetaInfo
82 // The info dict. nil if we don't have it (yet).
86 // Active peer connections, running message stream loops. TODO: Make this
87 // open (not-closed) connections only.
88 conns map[*connection]struct{}
89 maxEstablishedConns int
90 // Set of addrs to which we're attempting to connect. Connections are
91 // half-open until all handshakes are completed.
92 halfOpen map[string]Peer
93 fastestConn *connection
95 // Reserve of peers to connect to. A peer can be both here and in the
96 // active connections if were told about the peer after connecting with
97 // them. That encourages us to reconnect to peers that are well known in
99 peers prioritizedPeers
100 wantPeersEvent missinggo.Event
101 // An announcer for each tracker URL.
102 trackerAnnouncers map[string]*trackerScraper
103 // How many times we've initiated a DHT announce. TODO: Move into stats.
106 // Name used if the info name isn't available. Should be cleared when the
107 // Info does become available.
111 // The bencoded bytes of the info dict. This is actively manipulated if
112 // the info bytes aren't initially available, and we try to fetch them
115 // Each element corresponds to the 16KiB metadata pieces. If true, we have
116 // received that piece.
117 metadataCompletedChunks []bool
118 metadataChanged sync.Cond
120 // Set when .Info is obtained.
121 gotMetainfo missinggo.Event
123 readers map[*reader]struct{}
124 readerNowPieces bitmap.Bitmap
125 readerReadaheadPieces bitmap.Bitmap
127 // A cache of pieces we need to get. Calculated from various piece and
128 // file priorities and completion states elsewhere.
129 pendingPieces prioritybitmap.PriorityBitmap
130 // A cache of completed piece indices.
131 completedPieces bitmap.Bitmap
132 // Pieces that need to be hashed.
133 piecesQueuedForHash bitmap.Bitmap
135 // A pool of piece priorities []int for assignment to new connections.
136 // These "inclinations" are used to give connections preference for
138 connPieceInclinationPool sync.Pool
140 // Count of each request across active connections.
141 pendingRequests map[request]int
142 // The last time we requested a chunk. Deleting the request from any
143 // connection will clear this value.
144 lastRequested map[request]*time.Timer
147 func (t *Torrent) tickleReaders() {
148 t.cl.event.Broadcast()
151 // Returns a channel that is closed when the Torrent is closed.
152 func (t *Torrent) Closed() <-chan struct{} {
153 return t.closed.LockedChan(t.cl.locker())
156 // KnownSwarm returns the known subset of the peers in the Torrent's swarm, including active,
157 // pending, and half-open peers.
158 func (t *Torrent) KnownSwarm() (ks []Peer) {
159 // Add pending peers to the list
160 t.peers.Each(func(peer Peer) {
161 ks = append(ks, peer)
164 // Add half-open peers to the list
165 for _, peer := range t.halfOpen {
166 ks = append(ks, peer)
169 // Add active peers to the list
170 for conn := range t.conns {
172 ks = append(ks, Peer{
174 IP: conn.remoteAddr.IP,
175 Port: int(conn.remoteAddr.Port),
176 Source: conn.Discovery,
177 // > If the connection is encrypted, that's certainly enough to set SupportsEncryption.
178 // > But if we're not connected to them with an encrypted connection, I couldn't say
179 // > what's appropriate. We can carry forward the SupportsEncryption value as we
180 // > received it from trackers/DHT/PEX, or just use the encryption state for the
181 // > connection. It's probably easiest to do the latter for now.
182 // https://github.com/anacrolix/torrent/pull/188
183 SupportsEncryption: conn.headerEncrypted,
190 func (t *Torrent) setChunkSize(size pp.Integer) {
192 t.chunkPool = &sync.Pool{
193 New: func() interface{} {
194 b := make([]byte, size)
200 func (t *Torrent) pieceComplete(piece pieceIndex) bool {
201 return t.completedPieces.Get(bitmap.BitIndex(piece))
204 func (t *Torrent) pieceCompleteUncached(piece pieceIndex) storage.Completion {
205 return t.pieces[piece].Storage().Completion()
208 // There's a connection to that address already.
209 func (t *Torrent) addrActive(addr string) bool {
210 if _, ok := t.halfOpen[addr]; ok {
213 for c := range t.conns {
215 if ra.String() == addr {
222 func (t *Torrent) unclosedConnsAsSlice() (ret []*connection) {
223 ret = make([]*connection, 0, len(t.conns))
224 for c := range t.conns {
225 if !c.closed.IsSet() {
232 func (t *Torrent) addPeer(p Peer) {
234 peersAddedBySource.Add(string(p.Source), 1)
235 if t.closed.IsSet() {
238 if cl.badPeerIPPort(p.IP, p.Port) {
239 torrent.Add("peers not added because of bad addr", 1)
243 torrent.Add("peers replaced", 1)
246 for t.peers.Len() > cl.config.TorrentPeersHighWater {
247 _, ok := t.peers.DeleteMin()
249 torrent.Add("excess reserve peers discarded", 1)
254 func (t *Torrent) invalidateMetadata() {
255 for i := range t.metadataCompletedChunks {
256 t.metadataCompletedChunks[i] = false
263 func (t *Torrent) saveMetadataPiece(index int, data []byte) {
267 if index >= len(t.metadataCompletedChunks) {
268 t.logger.Printf("%s: ignoring metadata piece %d", t, index)
271 copy(t.metadataBytes[(1<<14)*index:], data)
272 t.metadataCompletedChunks[index] = true
275 func (t *Torrent) metadataPieceCount() int {
276 return (len(t.metadataBytes) + (1 << 14) - 1) / (1 << 14)
279 func (t *Torrent) haveMetadataPiece(piece int) bool {
281 return (1<<14)*piece < len(t.metadataBytes)
283 return piece < len(t.metadataCompletedChunks) && t.metadataCompletedChunks[piece]
287 func (t *Torrent) metadataSize() int {
288 return len(t.metadataBytes)
291 func infoPieceHashes(info *metainfo.Info) (ret [][]byte) {
292 for i := 0; i < len(info.Pieces); i += sha1.Size {
293 ret = append(ret, info.Pieces[i:i+sha1.Size])
298 func (t *Torrent) makePieces() {
299 hashes := infoPieceHashes(t.info)
300 t.pieces = make([]Piece, len(hashes), len(hashes))
301 for i, hash := range hashes {
302 piece := &t.pieces[i]
304 piece.index = pieceIndex(i)
305 piece.noPendingWrites.L = &piece.pendingWritesMutex
306 piece.hash = (*metainfo.Hash)(unsafe.Pointer(&hash[0]))
308 beginFile := pieceFirstFileIndex(piece.torrentBeginOffset(), files)
309 endFile := pieceEndFileIndex(piece.torrentEndOffset(), files)
310 piece.files = files[beginFile:endFile]
314 // Returns the index of the first file containing the piece. files must be
315 // ordered by offset.
316 func pieceFirstFileIndex(pieceOffset int64, files []*File) int {
317 for i, f := range files {
318 if f.offset+f.length > pieceOffset {
325 // Returns the index after the last file containing the piece. files must be
326 // ordered by offset.
327 func pieceEndFileIndex(pieceEndOffset int64, files []*File) int {
328 for i, f := range files {
329 if f.offset+f.length >= pieceEndOffset {
336 func (t *Torrent) cacheLength() {
338 for _, f := range t.info.UpvertedFiles() {
344 func (t *Torrent) setInfo(info *metainfo.Info) error {
345 if err := validateInfo(info); err != nil {
346 return fmt.Errorf("bad info: %s", err)
348 if t.storageOpener != nil {
350 t.storage, err = t.storageOpener.OpenTorrent(info, t.infoHash)
352 return fmt.Errorf("error opening torrent storage: %s", err)
358 t.displayName = "" // Save a few bytes lol.
365 func (t *Torrent) onSetInfo() {
366 for conn := range t.conns {
367 if err := conn.setNumPieces(t.numPieces()); err != nil {
368 t.logger.Printf("closing connection: %s", err)
372 for i := range t.pieces {
373 t.updatePieceCompletion(pieceIndex(i))
375 if !p.storageCompletionOk {
376 // t.logger.Printf("piece %s completion unknown, queueing check", p)
377 t.queuePieceCheck(pieceIndex(i))
380 t.cl.event.Broadcast()
382 t.updateWantPeersEvent()
383 t.pendingRequests = make(map[request]int)
384 t.lastRequested = make(map[request]*time.Timer)
387 // Called when metadata for a torrent becomes available.
388 func (t *Torrent) setInfoBytes(b []byte) error {
389 if metainfo.HashBytes(b) != t.infoHash {
390 return errors.New("info bytes have wrong hash")
392 var info metainfo.Info
393 if err := bencode.Unmarshal(b, &info); err != nil {
394 return fmt.Errorf("error unmarshalling info bytes: %s", err)
396 if err := t.setInfo(&info); err != nil {
400 t.metadataCompletedChunks = nil
405 func (t *Torrent) haveAllMetadataPieces() bool {
409 if t.metadataCompletedChunks == nil {
412 for _, have := range t.metadataCompletedChunks {
420 // TODO: Propagate errors to disconnect peer.
421 func (t *Torrent) setMetadataSize(bytes int) (err error) {
423 // We already know the correct metadata size.
426 if bytes <= 0 || bytes > 10000000 { // 10MB, pulled from my ass.
427 return errors.New("bad size")
429 if t.metadataBytes != nil && len(t.metadataBytes) == int(bytes) {
432 t.metadataBytes = make([]byte, bytes)
433 t.metadataCompletedChunks = make([]bool, (bytes+(1<<14)-1)/(1<<14))
434 t.metadataChanged.Broadcast()
435 for c := range t.conns {
436 c.requestPendingMetadata()
441 // The current working name for the torrent. Either the name in the info dict,
442 // or a display name given such as by the dn value in a magnet link, or "".
443 func (t *Torrent) name() string {
445 defer t.nameMu.RUnlock()
452 func (t *Torrent) pieceState(index pieceIndex) (ret PieceState) {
453 p := &t.pieces[index]
454 ret.Priority = t.piecePriority(index)
455 ret.Completion = p.completion()
456 if p.queuedForHash() || p.hashing {
459 if !ret.Complete && t.piecePartiallyDownloaded(index) {
465 func (t *Torrent) metadataPieceSize(piece int) int {
466 return metadataPieceSize(len(t.metadataBytes), piece)
469 func (t *Torrent) newMetadataExtensionMessage(c *connection, msgType int, piece int, data []byte) pp.Message {
475 d["total_size"] = len(t.metadataBytes)
477 p := bencode.MustMarshal(d)
480 ExtendedID: c.PeerExtensionIDs[pp.ExtensionNameMetadata],
481 ExtendedPayload: append(p, data...),
485 func (t *Torrent) pieceStateRuns() (ret []PieceStateRun) {
486 rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
487 ret = append(ret, PieceStateRun{
488 PieceState: el.(PieceState),
492 for index := range t.pieces {
493 rle.Append(t.pieceState(pieceIndex(index)), 1)
499 // Produces a small string representing a PieceStateRun.
500 func pieceStateRunStatusChars(psr PieceStateRun) (ret string) {
501 ret = fmt.Sprintf("%d", psr.Length)
502 ret += func() string {
503 switch psr.Priority {
504 case PiecePriorityNext:
506 case PiecePriorityNormal:
508 case PiecePriorityReadahead:
510 case PiecePriorityNow:
512 case PiecePriorityHigh:
533 func (t *Torrent) writeStatus(w io.Writer) {
534 fmt.Fprintf(w, "Infohash: %s\n", t.infoHash.HexString())
535 fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
537 fmt.Fprintf(w, "Metadata have: ")
538 for _, h := range t.metadataCompletedChunks {
539 fmt.Fprintf(w, "%c", func() rune {
549 fmt.Fprintf(w, "Piece length: %s\n", func() string {
551 return fmt.Sprint(t.usualPieceSize())
557 fmt.Fprintf(w, "Num Pieces: %d (%d completed)\n", t.numPieces(), t.numPiecesCompleted())
558 fmt.Fprint(w, "Piece States:")
559 for _, psr := range t.pieceStateRuns() {
561 w.Write([]byte(pieceStateRunStatusChars(psr)))
565 fmt.Fprintf(w, "Reader Pieces:")
566 t.forReaderOffsetPieces(func(begin, end pieceIndex) (again bool) {
567 fmt.Fprintf(w, " %d:%d", begin, end)
572 fmt.Fprintf(w, "Enabled trackers:\n")
574 tw := tabwriter.NewWriter(w, 0, 0, 2, ' ', 0)
575 fmt.Fprintf(tw, " URL\tNext announce\tLast announce\n")
576 for _, ta := range slices.Sort(slices.FromMapElems(t.trackerAnnouncers), func(l, r *trackerScraper) bool {
577 return l.u.String() < r.u.String()
578 }).([]*trackerScraper) {
579 fmt.Fprintf(tw, " %s\n", ta.statusLine())
584 fmt.Fprintf(w, "DHT Announces: %d\n", t.numDHTAnnounces)
586 spew.NewDefaultConfig()
587 spew.Fdump(w, t.statsLocked())
589 conns := t.connsAsSlice()
590 slices.Sort(conns, worseConn)
591 for i, c := range conns {
592 fmt.Fprintf(w, "%2d. ", i+1)
597 func (t *Torrent) haveInfo() bool {
601 // Returns a run-time generated MetaInfo that includes the info bytes and
602 // announce-list as currently known to the client.
603 func (t *Torrent) newMetaInfo() metainfo.MetaInfo {
604 return metainfo.MetaInfo{
605 CreationDate: time.Now().Unix(),
606 Comment: "dynamic metainfo from client",
607 CreatedBy: "go.torrent",
608 AnnounceList: t.metainfo.UpvertedAnnounceList(),
609 InfoBytes: func() []byte {
611 return t.metadataBytes
619 func (t *Torrent) BytesMissing() int64 {
622 return t.bytesMissingLocked()
625 func (t *Torrent) bytesMissingLocked() int64 {
629 func (t *Torrent) bytesLeft() (left int64) {
630 bitmap.Flip(t.completedPieces, 0, bitmap.BitIndex(t.numPieces())).IterTyped(func(piece int) bool {
631 p := &t.pieces[piece]
632 left += int64(p.length() - p.numDirtyBytes())
638 // Bytes left to give in tracker announces.
639 func (t *Torrent) bytesLeftAnnounce() int64 {
647 func (t *Torrent) piecePartiallyDownloaded(piece pieceIndex) bool {
648 if t.pieceComplete(piece) {
651 if t.pieceAllDirty(piece) {
654 return t.pieces[piece].hasDirtyChunks()
657 func (t *Torrent) usualPieceSize() int {
658 return int(t.info.PieceLength)
661 func (t *Torrent) numPieces() pieceIndex {
662 return pieceIndex(t.info.NumPieces())
665 func (t *Torrent) numPiecesCompleted() (num int) {
666 return t.completedPieces.Len()
669 func (t *Torrent) close() (err error) {
672 if t.storage != nil {
675 t.storageLock.Unlock()
677 for conn := range t.conns {
680 t.cl.event.Broadcast()
681 t.pieceStateChanges.Close()
682 t.updateWantPeersEvent()
686 func (t *Torrent) requestOffset(r request) int64 {
687 return torrentRequestOffset(*t.length, int64(t.usualPieceSize()), r)
690 // Return the request that would include the given offset into the torrent
691 // data. Returns !ok if there is no such request.
692 func (t *Torrent) offsetRequest(off int64) (req request, ok bool) {
693 return torrentOffsetRequest(*t.length, t.info.PieceLength, int64(t.chunkSize), off)
696 func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
697 defer perf.ScopeTimerErr(&err)()
698 n, err := t.pieces[piece].Storage().WriteAt(data, begin)
699 if err == nil && n != len(data) {
700 err = io.ErrShortWrite
705 func (t *Torrent) bitfield() (bf []bool) {
706 bf = make([]bool, t.numPieces())
707 t.completedPieces.IterTyped(func(piece int) (again bool) {
714 func (t *Torrent) pieceNumChunks(piece pieceIndex) pp.Integer {
715 return (t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize
718 func (t *Torrent) pendAllChunkSpecs(pieceIndex pieceIndex) {
719 t.pieces[pieceIndex].dirtyChunks.Clear()
722 func (t *Torrent) pieceLength(piece pieceIndex) pp.Integer {
723 if t.info.PieceLength == 0 {
724 // There will be no variance amongst pieces. Only pain.
727 if piece == t.numPieces()-1 {
728 ret := pp.Integer(*t.length % t.info.PieceLength)
733 return pp.Integer(t.info.PieceLength)
736 func (t *Torrent) hashPiece(piece pieceIndex) (ret metainfo.Hash) {
737 hash := pieceHash.New()
738 p := &t.pieces[piece]
739 p.waitNoPendingWrites()
740 ip := t.info.Piece(int(piece))
742 n, err := io.Copy(hash, io.NewSectionReader(t.pieces[piece].Storage(), 0, pl))
744 missinggo.CopyExact(&ret, hash.Sum(nil))
747 if err != io.ErrUnexpectedEOF && !os.IsNotExist(err) {
748 t.logger.Printf("unexpected error hashing piece %d through %T: %s", piece, t.storage.TorrentImpl, err)
753 func (t *Torrent) haveAnyPieces() bool {
754 return t.completedPieces.Len() != 0
757 func (t *Torrent) haveAllPieces() bool {
761 return t.completedPieces.Len() == bitmap.BitIndex(t.numPieces())
764 func (t *Torrent) havePiece(index pieceIndex) bool {
765 return t.haveInfo() && t.pieceComplete(index)
768 func (t *Torrent) haveChunk(r request) (ret bool) {
770 // log.Println("have chunk", r, ret)
775 if t.pieceComplete(pieceIndex(r.Index)) {
778 p := &t.pieces[r.Index]
779 return !p.pendingChunk(r.chunkSpec, t.chunkSize)
782 func chunkIndex(cs chunkSpec, chunkSize pp.Integer) int {
783 return int(cs.Begin / chunkSize)
786 func (t *Torrent) wantPieceIndex(index pieceIndex) bool {
790 if index < 0 || index >= t.numPieces() {
793 p := &t.pieces[index]
794 if p.queuedForHash() {
800 if t.pieceComplete(index) {
803 if t.pendingPieces.Contains(bitmap.BitIndex(index)) {
806 // t.logger.Printf("piece %d not pending", index)
807 return !t.forReaderOffsetPieces(func(begin, end pieceIndex) bool {
808 return index < begin || index >= end
812 // The worst connection is one that hasn't been sent, or sent anything useful
813 // for the longest. A bad connection is one that usually sends us unwanted
814 // pieces, or has been in worser half of the established connections for more
816 func (t *Torrent) worstBadConn() *connection {
817 wcs := worseConnSlice{t.unclosedConnsAsSlice()}
820 c := heap.Pop(&wcs).(*connection)
821 if c.stats.ChunksReadWasted.Int64() >= 6 && c.stats.ChunksReadWasted.Int64() > c.stats.ChunksReadUseful.Int64() {
824 // If the connection is in the worst half of the established
825 // connection quota and is older than a minute.
826 if wcs.Len() >= (t.maxEstablishedConns+1)/2 {
827 // Give connections 1 minute to prove themselves.
828 if time.Since(c.completedHandshake) > time.Minute {
836 type PieceStateChange struct {
841 func (t *Torrent) publishPieceChange(piece pieceIndex) {
842 cur := t.pieceState(piece)
843 p := &t.pieces[piece]
844 if cur != p.publicPieceState {
845 p.publicPieceState = cur
846 t.pieceStateChanges.Publish(PieceStateChange{
853 func (t *Torrent) pieceNumPendingChunks(piece pieceIndex) pp.Integer {
854 if t.pieceComplete(piece) {
857 return t.pieceNumChunks(piece) - t.pieces[piece].numDirtyChunks()
860 func (t *Torrent) pieceAllDirty(piece pieceIndex) bool {
861 return t.pieces[piece].dirtyChunks.Len() == int(t.pieceNumChunks(piece))
864 func (t *Torrent) readersChanged() {
865 t.updateReaderPieces()
866 t.updateAllPiecePriorities()
869 func (t *Torrent) updateReaderPieces() {
870 t.readerNowPieces, t.readerReadaheadPieces = t.readerPiecePriorities()
873 func (t *Torrent) readerPosChanged(from, to pieceRange) {
877 t.updateReaderPieces()
878 // Order the ranges, high and low.
880 if l.begin > h.begin {
884 // Two distinct ranges.
885 t.updatePiecePriorities(l.begin, l.end)
886 t.updatePiecePriorities(h.begin, h.end)
893 t.updatePiecePriorities(l.begin, end)
897 func (t *Torrent) maybeNewConns() {
898 // Tickle the accept routine.
899 t.cl.event.Broadcast()
903 func (t *Torrent) piecePriorityChanged(piece pieceIndex) {
904 // t.logger.Printf("piece %d priority changed", piece)
905 for c := range t.conns {
906 if c.updatePiecePriority(piece) {
907 // log.Print("conn piece priority changed")
912 t.publishPieceChange(piece)
915 func (t *Torrent) updatePiecePriority(piece pieceIndex) {
916 p := &t.pieces[piece]
917 newPrio := p.uncachedPriority()
918 // t.logger.Printf("torrent %p: piece %d: uncached priority: %v", t, piece, newPrio)
919 if newPrio == PiecePriorityNone {
920 if !t.pendingPieces.Remove(bitmap.BitIndex(piece)) {
924 if !t.pendingPieces.Set(bitmap.BitIndex(piece), newPrio.BitmapPriority()) {
928 t.piecePriorityChanged(piece)
931 func (t *Torrent) updateAllPiecePriorities() {
932 t.updatePiecePriorities(0, t.numPieces())
935 // Update all piece priorities in one hit. This function should have the same
936 // output as updatePiecePriority, but across all pieces.
937 func (t *Torrent) updatePiecePriorities(begin, end pieceIndex) {
938 for i := begin; i < end; i++ {
939 t.updatePiecePriority(i)
943 // Returns the range of pieces [begin, end) that contains the extent of bytes.
944 func (t *Torrent) byteRegionPieces(off, size int64) (begin, end pieceIndex) {
945 if off >= *t.length {
955 begin = pieceIndex(off / t.info.PieceLength)
956 end = pieceIndex((off + size + t.info.PieceLength - 1) / t.info.PieceLength)
957 if end > pieceIndex(t.info.NumPieces()) {
958 end = pieceIndex(t.info.NumPieces())
963 // Returns true if all iterations complete without breaking. Returns the read
964 // regions for all readers. The reader regions should not be merged as some
965 // callers depend on this method to enumerate readers.
966 func (t *Torrent) forReaderOffsetPieces(f func(begin, end pieceIndex) (more bool)) (all bool) {
967 for r := range t.readers {
969 if p.begin >= p.end {
972 if !f(p.begin, p.end) {
979 func (t *Torrent) piecePriority(piece pieceIndex) piecePriority {
980 prio, ok := t.pendingPieces.GetPriority(bitmap.BitIndex(piece))
982 return PiecePriorityNone
987 ret := piecePriority(-prio)
988 if ret == PiecePriorityNone {
994 func (t *Torrent) pendRequest(req request) {
995 ci := chunkIndex(req.chunkSpec, t.chunkSize)
996 t.pieces[req.Index].pendChunkIndex(ci)
999 func (t *Torrent) pieceCompletionChanged(piece pieceIndex) {
1000 log.Call().Add("piece", piece).AddValue(debugLogValue).Log(t.logger)
1001 t.cl.event.Broadcast()
1002 if t.pieceComplete(piece) {
1003 t.onPieceCompleted(piece)
1005 t.onIncompletePiece(piece)
1007 t.updatePiecePriority(piece)
1010 func (t *Torrent) numReceivedConns() (ret int) {
1011 for c := range t.conns {
1012 if c.Discovery == peerSourceIncoming {
1019 func (t *Torrent) maxHalfOpen() int {
1020 // Note that if we somehow exceed the maximum established conns, we want
1021 // the negative value to have an effect.
1022 establishedHeadroom := int64(t.maxEstablishedConns - len(t.conns))
1023 extraIncoming := int64(t.numReceivedConns() - t.maxEstablishedConns/2)
1024 // We want to allow some experimentation with new peers, and to try to
1025 // upset an oversupply of received connections.
1026 return int(min(max(5, extraIncoming)+establishedHeadroom, int64(t.cl.config.HalfOpenConnsPerTorrent)))
1029 func (t *Torrent) openNewConns() {
1030 defer t.updateWantPeersEvent()
1031 for t.peers.Len() != 0 {
1035 if len(t.halfOpen) >= t.maxHalfOpen() {
1038 p := t.peers.PopMax()
1043 func (t *Torrent) getConnPieceInclination() []int {
1044 _ret := t.connPieceInclinationPool.Get()
1046 pieceInclinationsNew.Add(1)
1047 return rand.Perm(int(t.numPieces()))
1049 pieceInclinationsReused.Add(1)
1050 return *_ret.(*[]int)
1053 func (t *Torrent) putPieceInclination(pi []int) {
1054 t.connPieceInclinationPool.Put(&pi)
1055 pieceInclinationsPut.Add(1)
1058 func (t *Torrent) updatePieceCompletion(piece pieceIndex) bool {
1059 pcu := t.pieceCompleteUncached(piece)
1060 p := &t.pieces[piece]
1061 changed := t.completedPieces.Get(bitmap.BitIndex(piece)) != pcu.Complete || p.storageCompletionOk != pcu.Ok
1062 log.Fmsg("piece %d completion: %v", piece, pcu.Ok).AddValue(debugLogValue).Log(t.logger)
1063 p.storageCompletionOk = pcu.Ok
1064 t.completedPieces.Set(bitmap.BitIndex(piece), pcu.Complete)
1066 // t.logger.Printf("piece %d uncached completion: %v", piece, pcu.Complete)
1067 // t.logger.Printf("piece %d changed: %v", piece, changed)
1069 t.pieceCompletionChanged(piece)
1074 // Non-blocking read. Client lock is not required.
1075 func (t *Torrent) readAt(b []byte, off int64) (n int, err error) {
1076 p := &t.pieces[off/t.info.PieceLength]
1077 p.waitNoPendingWrites()
1078 return p.Storage().ReadAt(b, off-p.Info().Offset())
1081 func (t *Torrent) updateAllPieceCompletions() {
1082 for i := pieceIndex(0); i < t.numPieces(); i++ {
1083 t.updatePieceCompletion(i)
1087 // Returns an error if the metadata was completed, but couldn't be set for
1088 // some reason. Blame it on the last peer to contribute.
1089 func (t *Torrent) maybeCompleteMetadata() error {
1094 if !t.haveAllMetadataPieces() {
1095 // Don't have enough metadata pieces.
1098 err := t.setInfoBytes(t.metadataBytes)
1100 t.invalidateMetadata()
1101 return fmt.Errorf("error setting info bytes: %s", err)
1103 if t.cl.config.Debug {
1104 t.logger.Printf("%s: got metadata from peers", t)
1109 func (t *Torrent) readerPiecePriorities() (now, readahead bitmap.Bitmap) {
1110 t.forReaderOffsetPieces(func(begin, end pieceIndex) bool {
1112 now.Add(bitmap.BitIndex(begin))
1113 readahead.AddRange(bitmap.BitIndex(begin)+1, bitmap.BitIndex(end))
1120 func (t *Torrent) needData() bool {
1121 if t.closed.IsSet() {
1127 return t.pendingPieces.Len() != 0
1130 func appendMissingStrings(old, new []string) (ret []string) {
1133 for _, n := range new {
1134 for _, o := range old {
1139 ret = append(ret, n)
1144 func appendMissingTrackerTiers(existing [][]string, minNumTiers int) (ret [][]string) {
1146 for minNumTiers > len(ret) {
1147 ret = append(ret, nil)
1152 func (t *Torrent) addTrackers(announceList [][]string) {
1153 fullAnnounceList := &t.metainfo.AnnounceList
1154 t.metainfo.AnnounceList = appendMissingTrackerTiers(*fullAnnounceList, len(announceList))
1155 for tierIndex, trackerURLs := range announceList {
1156 (*fullAnnounceList)[tierIndex] = appendMissingStrings((*fullAnnounceList)[tierIndex], trackerURLs)
1158 t.startMissingTrackerScrapers()
1159 t.updateWantPeersEvent()
1162 // Don't call this before the info is available.
1163 func (t *Torrent) bytesCompleted() int64 {
1167 return t.info.TotalLength() - t.bytesLeft()
1170 func (t *Torrent) SetInfoBytes(b []byte) (err error) {
1173 return t.setInfoBytes(b)
1176 // Returns true if connection is removed from torrent.Conns.
1177 func (t *Torrent) deleteConnection(c *connection) (ret bool) {
1178 if !c.closed.IsSet() {
1179 panic("connection is not closed")
1180 // There are behaviours prevented by the closed state that will fail
1181 // if the connection has been deleted.
1185 torrent.Add("deleted connections", 1)
1186 c.deleteAllRequests()
1187 if len(t.conns) == 0 {
1188 t.assertNoPendingRequests()
1193 func (t *Torrent) assertNoPendingRequests() {
1194 if len(t.pendingRequests) != 0 {
1195 panic(t.pendingRequests)
1197 if len(t.lastRequested) != 0 {
1198 panic(t.lastRequested)
1202 func (t *Torrent) dropConnection(c *connection) {
1203 t.cl.event.Broadcast()
1205 if t.deleteConnection(c) {
1210 func (t *Torrent) wantPeers() bool {
1211 if t.closed.IsSet() {
1214 if t.peers.Len() > t.cl.config.TorrentPeersLowWater {
1217 return t.needData() || t.seeding()
1220 func (t *Torrent) updateWantPeersEvent() {
1222 t.wantPeersEvent.Set()
1224 t.wantPeersEvent.Clear()
1228 // Returns whether the client should make effort to seed the torrent.
1229 func (t *Torrent) seeding() bool {
1231 if t.closed.IsSet() {
1234 if cl.config.NoUpload {
1237 if !cl.config.Seed {
1240 if cl.config.DisableAggressiveUpload && t.needData() {
1246 func (t *Torrent) startScrapingTracker(_url string) {
1250 u, err := url.Parse(_url)
1252 // URLs with a leading '*' appear to be a uTorrent convention to
1253 // disable trackers.
1255 log.Str("error parsing tracker url").AddValues("url", _url).Log(t.logger)
1259 if u.Scheme == "udp" {
1261 t.startScrapingTracker(u.String())
1263 t.startScrapingTracker(u.String())
1266 if u.Scheme == "udp4" && (t.cl.config.DisableIPv4Peers || t.cl.config.DisableIPv4) {
1269 if u.Scheme == "udp6" && t.cl.config.DisableIPv6 {
1272 if _, ok := t.trackerAnnouncers[_url]; ok {
1275 newAnnouncer := &trackerScraper{
1279 if t.trackerAnnouncers == nil {
1280 t.trackerAnnouncers = make(map[string]*trackerScraper)
1282 t.trackerAnnouncers[_url] = newAnnouncer
1283 go newAnnouncer.Run()
1286 // Adds and starts tracker scrapers for tracker URLs that aren't already
1288 func (t *Torrent) startMissingTrackerScrapers() {
1289 if t.cl.config.DisableTrackers {
1292 t.startScrapingTracker(t.metainfo.Announce)
1293 for _, tier := range t.metainfo.AnnounceList {
1294 for _, url := range tier {
1295 t.startScrapingTracker(url)
1300 // Returns an AnnounceRequest with fields filled out to defaults and current
1302 func (t *Torrent) announceRequest(event tracker.AnnounceEvent) tracker.AnnounceRequest {
1303 // Note that IPAddress is not set. It's set for UDP inside the tracker
1304 // code, since it's dependent on the network in use.
1305 return tracker.AnnounceRequest{
1308 Port: uint16(t.cl.incomingPeerPort()),
1309 PeerId: t.cl.peerID,
1310 InfoHash: t.infoHash,
1311 Key: t.cl.announceKey(),
1313 // The following are vaguely described in BEP 3.
1315 Left: t.bytesLeftAnnounce(),
1316 Uploaded: t.stats.BytesWrittenData.Int64(),
1317 // There's no mention of wasted or unwanted download in the BEP.
1318 Downloaded: t.stats.BytesReadUsefulData.Int64(),
1322 // Adds peers revealed in an announce until the announce ends, or we have
1324 func (t *Torrent) consumeDhtAnnouncePeers(pvs <-chan dht.PeersValues) {
1326 for v := range pvs {
1328 for _, cp := range v.Peers {
1330 // Can't do anything with this.
1336 Source: peerSourceDHTGetPeers,
1343 func (t *Torrent) announceToDht(impliedPort bool, s *dht.Server) error {
1344 ps, err := s.Announce(t.infoHash, t.cl.incomingPeerPort(), impliedPort)
1348 go t.consumeDhtAnnouncePeers(ps.Peers)
1350 case <-t.closed.LockedChan(t.cl.locker()):
1351 case <-time.After(5 * time.Minute):
1357 func (t *Torrent) dhtAnnouncer(s *dht.Server) {
1361 case <-t.closed.LockedChan(cl.locker()):
1363 case <-t.wantPeersEvent.LockedChan(cl.locker()):
1368 err := t.announceToDht(true, s)
1370 t.logger.Printf("error announcing %q to DHT: %s", t, err)
1375 func (t *Torrent) addPeers(peers []Peer) {
1376 for _, p := range peers {
1381 func (t *Torrent) Stats() TorrentStats {
1383 defer t.cl.rUnlock()
1384 return t.statsLocked()
1387 func (t *Torrent) statsLocked() (ret TorrentStats) {
1388 ret.ActivePeers = len(t.conns)
1389 ret.HalfOpenPeers = len(t.halfOpen)
1390 ret.PendingPeers = t.peers.Len()
1391 ret.TotalPeers = t.numTotalPeers()
1392 ret.ConnectedSeeders = 0
1393 for c := range t.conns {
1394 if all, ok := c.peerHasAllPieces(); all && ok {
1395 ret.ConnectedSeeders++
1398 ret.ConnStats = t.stats.Copy()
1402 // The total number of peers in the torrent.
1403 func (t *Torrent) numTotalPeers() int {
1404 peers := make(map[string]struct{})
1405 for conn := range t.conns {
1406 ra := conn.conn.RemoteAddr()
1408 // It's been closed and doesn't support RemoteAddr.
1411 peers[ra.String()] = struct{}{}
1413 for addr := range t.halfOpen {
1414 peers[addr] = struct{}{}
1416 t.peers.Each(func(peer Peer) {
1417 peers[fmt.Sprintf("%s:%d", peer.IP, peer.Port)] = struct{}{}
1422 // Reconcile bytes transferred before connection was associated with a
1424 func (t *Torrent) reconcileHandshakeStats(c *connection) {
1425 if c.stats != (ConnStats{
1426 // Handshakes should only increment these fields:
1427 BytesWritten: c.stats.BytesWritten,
1428 BytesRead: c.stats.BytesRead,
1432 c.postHandshakeStats(func(cs *ConnStats) {
1433 cs.BytesRead.Add(c.stats.BytesRead.Int64())
1434 cs.BytesWritten.Add(c.stats.BytesWritten.Int64())
1436 c.reconciledHandshakeStats = true
1439 // Returns true if the connection is added.
1440 func (t *Torrent) addConnection(c *connection) (err error) {
1443 torrent.Add("added connections", 1)
1446 if t.closed.IsSet() {
1447 return errors.New("torrent closed")
1449 for c0 := range t.conns {
1450 if c.PeerID != c0.PeerID {
1453 if !t.cl.config.dropDuplicatePeerIds {
1456 if left, ok := c.hasPreferredNetworkOver(c0); ok && left {
1458 t.deleteConnection(c0)
1460 return errors.New("existing connection preferred")
1463 if len(t.conns) >= t.maxEstablishedConns {
1464 c := t.worstBadConn()
1466 return errors.New("don't want conns")
1469 t.deleteConnection(c)
1471 if len(t.conns) >= t.maxEstablishedConns {
1474 t.conns[c] = struct{}{}
1478 func (t *Torrent) wantConns() bool {
1479 if !t.networkingEnabled {
1482 if t.closed.IsSet() {
1485 if !t.seeding() && !t.needData() {
1488 if len(t.conns) < t.maxEstablishedConns {
1491 return t.worstBadConn() != nil
1494 func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
1497 oldMax = t.maxEstablishedConns
1498 t.maxEstablishedConns = max
1499 wcs := slices.HeapInterface(slices.FromMapKeys(t.conns), worseConn)
1500 for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 {
1501 t.dropConnection(wcs.Pop().(*connection))
1507 func (t *Torrent) pieceHashed(piece pieceIndex, correct bool) {
1508 log.Fmsg("hashed piece %d", piece).Add("piece", piece).Add("passed", correct).AddValue(debugLogValue).Log(t.logger)
1509 if t.closed.IsSet() {
1512 p := &t.pieces[piece]
1513 touchers := t.reapPieceTouchers(piece)
1514 if p.storageCompletionOk {
1515 // Don't score the first time a piece is hashed, it could be an
1518 pieceHashedCorrect.Add(1)
1520 log.Fmsg("piece failed hash: %d connections contributed", len(touchers)).AddValues(t, p).Log(t.logger)
1521 pieceHashedNotCorrect.Add(1)
1525 if len(touchers) != 0 {
1526 // Don't increment stats above connection-level for every involved
1528 t.allStats((*ConnStats).incrementPiecesDirtiedGood)
1530 for _, c := range touchers {
1531 c.stats.incrementPiecesDirtiedGood()
1533 err := p.Storage().MarkComplete()
1535 t.logger.Printf("%T: error marking piece complete %d: %s", t.storage, piece, err)
1538 if len(touchers) != 0 {
1539 // Don't increment stats above connection-level for every involved
1541 t.allStats((*ConnStats).incrementPiecesDirtiedBad)
1542 for _, c := range touchers {
1543 // Y u do dis peer?!
1544 c.stats.incrementPiecesDirtiedBad()
1546 slices.Sort(touchers, connLessTrusted)
1547 if t.cl.config.Debug {
1548 t.logger.Printf("dropping first corresponding conn from trust: %v", func() (ret []int64) {
1549 for _, c := range touchers {
1550 ret = append(ret, c.netGoodPiecesDirtied())
1556 t.cl.banPeerIP(c.remoteAddr.IP)
1559 t.onIncompletePiece(piece)
1560 p.Storage().MarkNotComplete()
1562 t.updatePieceCompletion(piece)
1565 func (t *Torrent) cancelRequestsForPiece(piece pieceIndex) {
1566 // TODO: Make faster
1567 for cn := range t.conns {
1572 func (t *Torrent) onPieceCompleted(piece pieceIndex) {
1573 t.pendAllChunkSpecs(piece)
1574 t.cancelRequestsForPiece(piece)
1575 for conn := range t.conns {
1580 // Called when a piece is found to be not complete.
1581 func (t *Torrent) onIncompletePiece(piece pieceIndex) {
1582 if t.pieceAllDirty(piece) {
1583 t.pendAllChunkSpecs(piece)
1585 if !t.wantPieceIndex(piece) {
1586 // t.logger.Printf("piece %d incomplete and unwanted", piece)
1589 // We could drop any connections that we told we have a piece that we
1590 // don't here. But there's a test failure, and it seems clients don't care
1591 // if you request pieces that you already claim to have. Pruning bad
1592 // connections might just remove any connections that aren't treating us
1593 // favourably anyway.
1595 // for c := range t.conns {
1596 // if c.sentHave(piece) {
1600 for conn := range t.conns {
1601 if conn.PeerHasPiece(piece) {
1602 conn.updateRequests()
1607 func (t *Torrent) verifyPiece(piece pieceIndex) {
1611 p := &t.pieces[piece]
1614 cl.event.Broadcast()
1616 for p.hashing || t.storage == nil {
1619 if !p.t.piecesQueuedForHash.Remove(bitmap.BitIndex(piece)) {
1620 panic("piece was not queued")
1622 t.updatePiecePriority(piece)
1623 if t.closed.IsSet() {
1627 t.publishPieceChange(piece)
1628 t.updatePiecePriority(piece)
1629 t.storageLock.RLock()
1631 sum := t.hashPiece(piece)
1632 t.storageLock.RUnlock()
1635 t.updatePiecePriority(piece)
1636 t.pieceHashed(piece, sum == *p.hash)
1637 t.publishPieceChange(piece)
1640 // Return the connections that touched a piece, and clear the entries while
1642 func (t *Torrent) reapPieceTouchers(piece pieceIndex) (ret []*connection) {
1643 for c := range t.pieces[piece].dirtiers {
1644 delete(c.peerTouchedPieces, piece)
1645 ret = append(ret, c)
1647 t.pieces[piece].dirtiers = nil
1651 func (t *Torrent) connsAsSlice() (ret []*connection) {
1652 for c := range t.conns {
1653 ret = append(ret, c)
1658 // Currently doesn't really queue, but should in the future.
1659 func (t *Torrent) queuePieceCheck(pieceIndex pieceIndex) {
1660 piece := &t.pieces[pieceIndex]
1661 if piece.queuedForHash() {
1664 t.piecesQueuedForHash.Add(bitmap.BitIndex(pieceIndex))
1665 t.publishPieceChange(pieceIndex)
1666 t.updatePiecePriority(pieceIndex)
1667 go t.verifyPiece(pieceIndex)
1670 // Forces all the pieces to be re-hashed. See also Piece.VerifyData.
1671 func (t *Torrent) VerifyData() {
1672 for i := pieceIndex(0); i < t.NumPieces(); i++ {
1673 t.Piece(i).VerifyData()
1677 // Start the process of connecting to the given peer for the given torrent if
1679 func (t *Torrent) initiateConn(peer Peer) {
1680 if peer.Id == t.cl.peerID {
1683 if t.cl.badPeerIPPort(peer.IP, peer.Port) {
1686 addr := IpPort{peer.IP, uint16(peer.Port)}
1687 if t.addrActive(addr.String()) {
1690 t.halfOpen[addr.String()] = peer
1691 go t.cl.outgoingConnection(t, addr, peer.Source)
1694 func (t *Torrent) AddClientPeer(cl *Client) {
1695 t.AddPeers(func() (ps []Peer) {
1696 for _, la := range cl.ListenAddrs() {
1697 ps = append(ps, Peer{
1698 IP: missinggo.AddrIP(la),
1699 Port: missinggo.AddrPort(la),
1706 // All stats that include this Torrent. Useful when we want to increment
1707 // ConnStats but not for every connection.
1708 func (t *Torrent) allStats(f func(*ConnStats)) {
1713 func (t *Torrent) hashingPiece(i pieceIndex) bool {
1714 return t.pieces[i].hashing
1717 func (t *Torrent) pieceQueuedForHash(i pieceIndex) bool {
1718 return t.piecesQueuedForHash.Get(bitmap.BitIndex(i))
1721 func (t *Torrent) dialTimeout() time.Duration {
1722 return reducedDialTimeout(t.cl.config.MinDialTimeout, t.cl.config.NominalDialTimeout, t.cl.config.HalfOpenConnsPerTorrent, t.peers.Len())