19 "github.com/anacrolix/dht"
20 "github.com/anacrolix/log"
21 "github.com/anacrolix/missinggo"
22 "github.com/anacrolix/missinggo/bitmap"
23 "github.com/anacrolix/missinggo/perf"
24 "github.com/anacrolix/missinggo/prioritybitmap"
25 "github.com/anacrolix/missinggo/pubsub"
26 "github.com/anacrolix/missinggo/slices"
27 "github.com/bradfitz/iter"
28 "github.com/davecgh/go-spew/spew"
30 "github.com/anacrolix/torrent/bencode"
31 "github.com/anacrolix/torrent/metainfo"
32 pp "github.com/anacrolix/torrent/peer_protocol"
33 "github.com/anacrolix/torrent/storage"
34 "github.com/anacrolix/torrent/tracker"
37 func (t *Torrent) chunkIndexSpec(chunkIndex, piece int) chunkSpec {
38 return chunkIndexSpec(chunkIndex, t.pieceLength(piece), t.chunkSize)
41 type peersKey struct {
46 // Maintains state of torrent within a Client.
48 // Torrent-level aggregate statistics. First in struct to ensure 64-bit
49 // alignment. See #262.
54 networkingEnabled bool
56 // Determines what chunks to request from peers. 1: Favour higher priority
57 // pieces with some fuzzing to reduce overlaps and wastage across
58 // connections. 2: The fastest connection downloads strictly in order of
59 // priority, while all others adher to their piece inclications. 3:
60 // Requests are strictly by piece priority, and not duplicated until
61 // duplicateRequestTimeout is reached.
63 // How long to avoid duplicating a pending request.
64 duplicateRequestTimeout time.Duration
66 closed missinggo.Event
67 infoHash metainfo.Hash
69 // Values are the piece indices that changed.
70 pieceStateChanges *pubsub.PubSub
71 // The size of chunks to request from peers over the wire. This is
72 // normally 16KiB by convention these days.
75 // Total length of the torrent in bytes. Stored because it's not O(1) to
76 // get this from the info dict.
79 // The storage to open when the info dict becomes available.
80 storageOpener *storage.Client
81 // Storage for torrent data.
82 storage *storage.Torrent
83 // Read-locked for using storage, and write-locked for Closing.
84 storageLock sync.RWMutex
86 // TODO: Only announce stuff is used?
87 metainfo metainfo.MetaInfo
89 // The info dict. nil if we don't have it (yet).
93 // Active peer connections, running message stream loops. TODO: Make this
94 // open (not-closed) connections only.
95 conns map[*connection]struct{}
96 maxEstablishedConns int
97 // Set of addrs to which we're attempting to connect. Connections are
98 // half-open until all handshakes are completed.
99 halfOpen map[string]Peer
100 fastestConn *connection
102 // Reserve of peers to connect to. A peer can be both here and in the
103 // active connections if were told about the peer after connecting with
104 // them. That encourages us to reconnect to peers that are well known in
106 peers prioritizedPeers
107 wantPeersEvent missinggo.Event
108 // An announcer for each tracker URL.
109 trackerAnnouncers map[string]*trackerScraper
110 // How many times we've initiated a DHT announce. TODO: Move into stats.
113 // Name used if the info name isn't available. Should be cleared when the
114 // Info does become available.
117 // The bencoded bytes of the info dict. This is actively manipulated if
118 // the info bytes aren't initially available, and we try to fetch them
121 // Each element corresponds to the 16KiB metadata pieces. If true, we have
122 // received that piece.
123 metadataCompletedChunks []bool
124 metadataChanged sync.Cond
126 // Set when .Info is obtained.
127 gotMetainfo missinggo.Event
129 readers map[*reader]struct{}
130 readerNowPieces bitmap.Bitmap
131 readerReadaheadPieces bitmap.Bitmap
133 // A cache of pieces we need to get. Calculated from various piece and
134 // file priorities and completion states elsewhere.
135 pendingPieces prioritybitmap.PriorityBitmap
136 // A cache of completed piece indices.
137 completedPieces bitmap.Bitmap
138 // Pieces that need to be hashed.
139 piecesQueuedForHash bitmap.Bitmap
141 // A pool of piece priorities []int for assignment to new connections.
142 // These "inclinations" are used to give connections preference for
144 connPieceInclinationPool sync.Pool
146 // Count of each request across active connections.
147 pendingRequests map[request]int
148 // The last time we requested a chunk. Deleting the request from any
149 // connection will clear this value.
150 lastRequested map[request]time.Time
153 func (t *Torrent) tickleReaders() {
154 t.cl.event.Broadcast()
157 // Returns a channel that is closed when the Torrent is closed.
158 func (t *Torrent) Closed() <-chan struct{} {
159 return t.closed.LockedChan(&t.cl.mu)
162 // KnownSwarm returns the known subset of the peers in the Torrent's swarm, including active,
163 // pending, and half-open peers.
164 func (t *Torrent) KnownSwarm() (ks []Peer) {
165 // Add pending peers to the list
166 t.peers.Each(func(peer Peer) {
167 ks = append(ks, peer)
170 // Add half-open peers to the list
171 for _, peer := range t.halfOpen {
172 ks = append(ks, peer)
175 // Add active peers to the list
176 for conn := range t.conns {
177 host, portString, err := net.SplitHostPort(conn.remoteAddr().String())
182 ip := net.ParseIP(host)
183 port, err := strconv.Atoi(portString)
188 ks = append(ks, Peer{
192 Source: conn.Discovery,
193 // > If the connection is encrypted, that's certainly enough to set SupportsEncryption.
194 // > But if we're not connected to them with an encrypted connection, I couldn't say
195 // > what's appropriate. We can carry forward the SupportsEncryption value as we
196 // > received it from trackers/DHT/PEX, or just use the encryption state for the
197 // > connection. It's probably easiest to do the latter for now.
198 // https://github.com/anacrolix/torrent/pull/188
199 SupportsEncryption: conn.headerEncrypted,
206 func (t *Torrent) setChunkSize(size pp.Integer) {
208 t.chunkPool = &sync.Pool{
209 New: func() interface{} {
210 b := make([]byte, size)
216 func (t *Torrent) setDisplayName(dn string) {
223 func (t *Torrent) pieceComplete(piece int) bool {
224 return t.completedPieces.Get(piece)
227 func (t *Torrent) pieceCompleteUncached(piece int) storage.Completion {
228 return t.pieces[piece].Storage().Completion()
231 // There's a connection to that address already.
232 func (t *Torrent) addrActive(addr string) bool {
233 if _, ok := t.halfOpen[addr]; ok {
236 for c := range t.conns {
241 if ra.String() == addr {
248 func (t *Torrent) unclosedConnsAsSlice() (ret []*connection) {
249 ret = make([]*connection, 0, len(t.conns))
250 for c := range t.conns {
251 if !c.closed.IsSet() {
258 func (t *Torrent) addPeer(p Peer) {
260 peersAddedBySource.Add(string(p.Source), 1)
261 if t.closed.IsSet() {
264 if cl.badPeerIPPort(p.IP, p.Port) {
265 torrent.Add("peers not added because of bad addr", 1)
269 torrent.Add("peers replaced", 1)
272 for t.peers.Len() > cl.config.TorrentPeersHighWater {
273 _, ok := t.peers.DeleteMin()
275 torrent.Add("excess reserve peers discarded", 1)
280 func (t *Torrent) invalidateMetadata() {
281 for i := range t.metadataCompletedChunks {
282 t.metadataCompletedChunks[i] = false
287 func (t *Torrent) saveMetadataPiece(index int, data []byte) {
291 if index >= len(t.metadataCompletedChunks) {
292 log.Printf("%s: ignoring metadata piece %d", t, index)
295 copy(t.metadataBytes[(1<<14)*index:], data)
296 t.metadataCompletedChunks[index] = true
299 func (t *Torrent) metadataPieceCount() int {
300 return (len(t.metadataBytes) + (1 << 14) - 1) / (1 << 14)
303 func (t *Torrent) haveMetadataPiece(piece int) bool {
305 return (1<<14)*piece < len(t.metadataBytes)
307 return piece < len(t.metadataCompletedChunks) && t.metadataCompletedChunks[piece]
311 func (t *Torrent) metadataSizeKnown() bool {
312 return t.metadataBytes != nil
315 func (t *Torrent) metadataSize() int {
316 return len(t.metadataBytes)
319 func infoPieceHashes(info *metainfo.Info) (ret []string) {
320 for i := 0; i < len(info.Pieces); i += sha1.Size {
321 ret = append(ret, string(info.Pieces[i:i+sha1.Size]))
326 func (t *Torrent) makePieces() {
327 hashes := infoPieceHashes(t.info)
328 t.pieces = make([]Piece, len(hashes))
329 for i, hash := range hashes {
330 piece := &t.pieces[i]
333 piece.noPendingWrites.L = &piece.pendingWritesMutex
334 missinggo.CopyExact(piece.hash[:], hash)
336 beginFile := pieceFirstFileIndex(piece.torrentBeginOffset(), files)
337 endFile := pieceEndFileIndex(piece.torrentEndOffset(), files)
338 piece.files = files[beginFile:endFile]
342 // Returns the index of the first file containing the piece. files must be
343 // ordered by offset.
344 func pieceFirstFileIndex(pieceOffset int64, files []*File) int {
345 for i, f := range files {
346 if f.offset+f.length > pieceOffset {
353 // Returns the index after the last file containing the piece. files must be
354 // ordered by offset.
355 func pieceEndFileIndex(pieceEndOffset int64, files []*File) int {
356 for i, f := range files {
357 if f.offset+f.length >= pieceEndOffset {
364 func (t *Torrent) cacheLength() {
366 for _, f := range t.info.UpvertedFiles() {
372 func (t *Torrent) setInfo(info *metainfo.Info) error {
373 if err := validateInfo(info); err != nil {
374 return fmt.Errorf("bad info: %s", err)
376 if t.storageOpener != nil {
378 t.storage, err = t.storageOpener.OpenTorrent(info, t.infoHash)
380 return fmt.Errorf("error opening torrent storage: %s", err)
384 t.displayName = "" // Save a few bytes lol.
391 func (t *Torrent) onSetInfo() {
392 for conn := range t.conns {
393 if err := conn.setNumPieces(t.numPieces()); err != nil {
394 log.Printf("closing connection: %s", err)
398 for i := range t.pieces {
399 t.updatePieceCompletion(i)
401 if !p.storageCompletionOk {
402 // log.Printf("piece %s completion unknown, queueing check", p)
406 t.cl.event.Broadcast()
408 t.updateWantPeersEvent()
409 t.pendingRequests = make(map[request]int)
410 t.lastRequested = make(map[request]time.Time)
413 // Called when metadata for a torrent becomes available.
414 func (t *Torrent) setInfoBytes(b []byte) error {
415 if metainfo.HashBytes(b) != t.infoHash {
416 return errors.New("info bytes have wrong hash")
418 var info metainfo.Info
419 if err := bencode.Unmarshal(b, &info); err != nil {
420 return fmt.Errorf("error unmarshalling info bytes: %s", err)
422 if err := t.setInfo(&info); err != nil {
426 t.metadataCompletedChunks = nil
431 func (t *Torrent) haveAllMetadataPieces() bool {
435 if t.metadataCompletedChunks == nil {
438 for _, have := range t.metadataCompletedChunks {
446 // TODO: Propagate errors to disconnect peer.
447 func (t *Torrent) setMetadataSize(bytes int64) (err error) {
449 // We already know the correct metadata size.
452 if bytes <= 0 || bytes > 10000000 { // 10MB, pulled from my ass.
453 return errors.New("bad size")
455 if t.metadataBytes != nil && len(t.metadataBytes) == int(bytes) {
458 t.metadataBytes = make([]byte, bytes)
459 t.metadataCompletedChunks = make([]bool, (bytes+(1<<14)-1)/(1<<14))
460 t.metadataChanged.Broadcast()
461 for c := range t.conns {
462 c.requestPendingMetadata()
467 // The current working name for the torrent. Either the name in the info dict,
468 // or a display name given such as by the dn value in a magnet link, or "".
469 func (t *Torrent) name() string {
476 func (t *Torrent) pieceState(index int) (ret PieceState) {
477 p := &t.pieces[index]
478 ret.Priority = t.piecePriority(index)
479 ret.Completion = p.completion()
480 if p.queuedForHash() || p.hashing {
483 if !ret.Complete && t.piecePartiallyDownloaded(index) {
489 func (t *Torrent) metadataPieceSize(piece int) int {
490 return metadataPieceSize(len(t.metadataBytes), piece)
493 func (t *Torrent) newMetadataExtensionMessage(c *connection, msgType int, piece int, data []byte) pp.Message {
499 d["total_size"] = len(t.metadataBytes)
501 p, err := bencode.Marshal(d)
507 ExtendedID: c.PeerExtensionIDs["ut_metadata"],
508 ExtendedPayload: append(p, data...),
512 func (t *Torrent) pieceStateRuns() (ret []PieceStateRun) {
513 rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
514 ret = append(ret, PieceStateRun{
515 PieceState: el.(PieceState),
519 for index := range t.pieces {
520 rle.Append(t.pieceState(index), 1)
526 // Produces a small string representing a PieceStateRun.
527 func pieceStateRunStatusChars(psr PieceStateRun) (ret string) {
528 ret = fmt.Sprintf("%d", psr.Length)
529 ret += func() string {
530 switch psr.Priority {
531 case PiecePriorityNext:
533 case PiecePriorityNormal:
535 case PiecePriorityReadahead:
537 case PiecePriorityNow:
539 case PiecePriorityHigh:
560 func (t *Torrent) writeStatus(w io.Writer) {
561 fmt.Fprintf(w, "Infohash: %s\n", t.infoHash.HexString())
562 fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
564 fmt.Fprintf(w, "Metadata have: ")
565 for _, h := range t.metadataCompletedChunks {
566 fmt.Fprintf(w, "%c", func() rune {
576 fmt.Fprintf(w, "Piece length: %s\n", func() string {
578 return fmt.Sprint(t.usualPieceSize())
584 fmt.Fprintf(w, "Num Pieces: %d (%d completed)\n", t.numPieces(), t.numPiecesCompleted())
585 fmt.Fprint(w, "Piece States:")
586 for _, psr := range t.pieceStateRuns() {
588 w.Write([]byte(pieceStateRunStatusChars(psr)))
592 fmt.Fprintf(w, "Reader Pieces:")
593 t.forReaderOffsetPieces(func(begin, end int) (again bool) {
594 fmt.Fprintf(w, " %d:%d", begin, end)
599 fmt.Fprintf(w, "Enabled trackers:\n")
601 tw := tabwriter.NewWriter(w, 0, 0, 2, ' ', 0)
602 fmt.Fprintf(tw, " URL\tNext announce\tLast announce\n")
603 for _, ta := range slices.Sort(slices.FromMapElems(t.trackerAnnouncers), func(l, r *trackerScraper) bool {
604 return l.u.String() < r.u.String()
605 }).([]*trackerScraper) {
606 fmt.Fprintf(tw, " %s\n", ta.statusLine())
611 fmt.Fprintf(w, "DHT Announces: %d\n", t.numDHTAnnounces)
613 spew.NewDefaultConfig()
614 spew.Fdump(w, t.statsLocked())
616 conns := t.connsAsSlice()
617 slices.Sort(conns, worseConn)
618 for i, c := range conns {
619 fmt.Fprintf(w, "%2d. ", i+1)
624 func (t *Torrent) haveInfo() bool {
628 // Returns a run-time generated MetaInfo that includes the info bytes and
629 // announce-list as currently known to the client.
630 func (t *Torrent) newMetaInfo() metainfo.MetaInfo {
631 return metainfo.MetaInfo{
632 CreationDate: time.Now().Unix(),
633 Comment: "dynamic metainfo from client",
634 CreatedBy: "go.torrent",
635 AnnounceList: t.metainfo.UpvertedAnnounceList(),
636 InfoBytes: func() []byte {
638 return t.metadataBytes
646 func (t *Torrent) BytesMissing() int64 {
648 defer t.mu().RUnlock()
649 return t.bytesMissingLocked()
652 func (t *Torrent) bytesMissingLocked() int64 {
656 func (t *Torrent) bytesLeft() (left int64) {
657 bitmap.Flip(t.completedPieces, 0, t.numPieces()).IterTyped(func(piece int) bool {
658 p := &t.pieces[piece]
659 left += int64(p.length() - p.numDirtyBytes())
665 // Bytes left to give in tracker announces.
666 func (t *Torrent) bytesLeftAnnounce() uint64 {
668 return uint64(t.bytesLeft())
670 return math.MaxUint64
674 func (t *Torrent) piecePartiallyDownloaded(piece int) bool {
675 if t.pieceComplete(piece) {
678 if t.pieceAllDirty(piece) {
681 return t.pieces[piece].hasDirtyChunks()
684 func (t *Torrent) usualPieceSize() int {
685 return int(t.info.PieceLength)
688 func (t *Torrent) numPieces() int {
689 return t.info.NumPieces()
692 func (t *Torrent) numPiecesCompleted() (num int) {
693 return t.completedPieces.Len()
696 func (t *Torrent) close() (err error) {
699 if t.storage != nil {
702 t.storageLock.Unlock()
704 for conn := range t.conns {
707 t.cl.event.Broadcast()
708 t.pieceStateChanges.Close()
709 t.updateWantPeersEvent()
713 func (t *Torrent) requestOffset(r request) int64 {
714 return torrentRequestOffset(*t.length, int64(t.usualPieceSize()), r)
717 // Return the request that would include the given offset into the torrent
718 // data. Returns !ok if there is no such request.
719 func (t *Torrent) offsetRequest(off int64) (req request, ok bool) {
720 return torrentOffsetRequest(*t.length, t.info.PieceLength, int64(t.chunkSize), off)
723 func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
724 defer perf.ScopeTimerErr(&err)()
725 n, err := t.pieces[piece].Storage().WriteAt(data, begin)
726 if err == nil && n != len(data) {
727 err = io.ErrShortWrite
732 func (t *Torrent) bitfield() (bf []bool) {
733 bf = make([]bool, t.numPieces())
734 t.completedPieces.IterTyped(func(piece int) (again bool) {
741 func (t *Torrent) pieceNumChunks(piece int) int {
742 return int((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize)
745 func (t *Torrent) pendAllChunkSpecs(pieceIndex int) {
746 t.pieces[pieceIndex].dirtyChunks.Clear()
749 func (t *Torrent) pieceLength(piece int) pp.Integer {
750 if t.info.PieceLength == 0 {
751 // There will be no variance amongst pieces. Only pain.
754 if piece == t.numPieces()-1 {
755 ret := pp.Integer(*t.length % t.info.PieceLength)
760 return pp.Integer(t.info.PieceLength)
763 func (t *Torrent) hashPiece(piece int) (ret metainfo.Hash) {
764 hash := pieceHash.New()
765 p := &t.pieces[piece]
766 p.waitNoPendingWrites()
767 ip := t.info.Piece(piece)
769 n, err := io.Copy(hash, io.NewSectionReader(t.pieces[piece].Storage(), 0, pl))
771 missinggo.CopyExact(&ret, hash.Sum(nil))
774 if err != io.ErrUnexpectedEOF && !os.IsNotExist(err) {
775 log.Printf("unexpected error hashing piece with %T: %s", t.storage.TorrentImpl, err)
780 func (t *Torrent) haveAnyPieces() bool {
781 return t.completedPieces.Len() != 0
784 func (t *Torrent) haveAllPieces() bool {
788 return t.completedPieces.Len() == t.numPieces()
791 func (t *Torrent) havePiece(index int) bool {
792 return t.haveInfo() && t.pieceComplete(index)
795 func (t *Torrent) haveChunk(r request) (ret bool) {
797 // log.Println("have chunk", r, ret)
802 if t.pieceComplete(int(r.Index)) {
805 p := &t.pieces[r.Index]
806 return !p.pendingChunk(r.chunkSpec, t.chunkSize)
809 func chunkIndex(cs chunkSpec, chunkSize pp.Integer) int {
810 return int(cs.Begin / chunkSize)
813 func (t *Torrent) wantPiece(r request) bool {
814 if !t.wantPieceIndex(int(r.Index)) {
817 if t.pieces[r.Index].pendingChunk(r.chunkSpec, t.chunkSize) {
820 // TODO: What about pieces that were wanted, but aren't now, and aren't
821 // completed either? That used to be done here.
825 func (t *Torrent) wantPieceIndex(index int) bool {
829 if index < 0 || index >= t.numPieces() {
832 p := &t.pieces[index]
833 if p.queuedForHash() {
839 if t.pieceComplete(index) {
842 if t.pendingPieces.Contains(index) {
845 // log.Printf("piece %d not pending", index)
846 return !t.forReaderOffsetPieces(func(begin, end int) bool {
847 return index < begin || index >= end
851 // The worst connection is one that hasn't been sent, or sent anything useful
852 // for the longest. A bad connection is one that usually sends us unwanted
853 // pieces, or has been in worser half of the established connections for more
855 func (t *Torrent) worstBadConn() *connection {
856 wcs := worseConnSlice{t.unclosedConnsAsSlice()}
859 c := heap.Pop(&wcs).(*connection)
860 if c.stats.ChunksReadWasted.Int64() >= 6 && c.stats.ChunksReadWasted.Int64() > c.stats.ChunksReadUseful.Int64() {
863 // If the connection is in the worst half of the established
864 // connection quota and is older than a minute.
865 if wcs.Len() >= (t.maxEstablishedConns+1)/2 {
866 // Give connections 1 minute to prove themselves.
867 if time.Since(c.completedHandshake) > time.Minute {
875 type PieceStateChange struct {
880 func (t *Torrent) publishPieceChange(piece int) {
881 cur := t.pieceState(piece)
882 p := &t.pieces[piece]
883 if cur != p.publicPieceState {
884 p.publicPieceState = cur
885 t.pieceStateChanges.Publish(PieceStateChange{
892 func (t *Torrent) pieceNumPendingChunks(piece int) int {
893 if t.pieceComplete(piece) {
896 return t.pieceNumChunks(piece) - t.pieces[piece].numDirtyChunks()
899 func (t *Torrent) pieceAllDirty(piece int) bool {
900 return t.pieces[piece].dirtyChunks.Len() == t.pieceNumChunks(piece)
903 func (t *Torrent) readersChanged() {
904 t.updateReaderPieces()
905 t.updateAllPiecePriorities()
908 func (t *Torrent) updateReaderPieces() {
909 t.readerNowPieces, t.readerReadaheadPieces = t.readerPiecePriorities()
912 func (t *Torrent) readerPosChanged(from, to pieceRange) {
916 t.updateReaderPieces()
917 // Order the ranges, high and low.
919 if l.begin > h.begin {
923 // Two distinct ranges.
924 t.updatePiecePriorities(l.begin, l.end)
925 t.updatePiecePriorities(h.begin, h.end)
932 t.updatePiecePriorities(l.begin, end)
936 func (t *Torrent) maybeNewConns() {
937 // Tickle the accept routine.
938 t.cl.event.Broadcast()
942 func (t *Torrent) piecePriorityChanged(piece int) {
943 // log.Printf("piece %d priority changed", piece)
944 for c := range t.conns {
945 if c.updatePiecePriority(piece) {
946 // log.Print("conn piece priority changed")
951 t.publishPieceChange(piece)
954 func (t *Torrent) updatePiecePriority(piece int) {
955 p := &t.pieces[piece]
956 newPrio := p.uncachedPriority()
957 // log.Printf("torrent %p: piece %d: uncached priority: %v", t, piece, newPrio)
958 if newPrio == PiecePriorityNone {
959 if !t.pendingPieces.Remove(piece) {
963 if !t.pendingPieces.Set(piece, newPrio.BitmapPriority()) {
967 t.piecePriorityChanged(piece)
970 func (t *Torrent) updateAllPiecePriorities() {
971 t.updatePiecePriorities(0, len(t.pieces))
974 // Update all piece priorities in one hit. This function should have the same
975 // output as updatePiecePriority, but across all pieces.
976 func (t *Torrent) updatePiecePriorities(begin, end int) {
977 for i := begin; i < end; i++ {
978 t.updatePiecePriority(i)
982 // Returns the range of pieces [begin, end) that contains the extent of bytes.
983 func (t *Torrent) byteRegionPieces(off, size int64) (begin, end int) {
984 if off >= *t.length {
994 begin = int(off / t.info.PieceLength)
995 end = int((off + size + t.info.PieceLength - 1) / t.info.PieceLength)
996 if end > t.info.NumPieces() {
997 end = t.info.NumPieces()
1002 // Returns true if all iterations complete without breaking. Returns the read
1003 // regions for all readers. The reader regions should not be merged as some
1004 // callers depend on this method to enumerate readers.
1005 func (t *Torrent) forReaderOffsetPieces(f func(begin, end int) (more bool)) (all bool) {
1006 for r := range t.readers {
1008 if p.begin >= p.end {
1011 if !f(p.begin, p.end) {
1018 func (t *Torrent) piecePriority(piece int) piecePriority {
1019 prio, ok := t.pendingPieces.GetPriority(piece)
1021 return PiecePriorityNone
1026 ret := piecePriority(-prio)
1027 if ret == PiecePriorityNone {
1033 func (t *Torrent) pendRequest(req request) {
1034 ci := chunkIndex(req.chunkSpec, t.chunkSize)
1035 t.pieces[req.Index].pendChunkIndex(ci)
1038 func (t *Torrent) pieceCompletionChanged(piece int) {
1039 log.Call().Add("piece", piece).AddValue(debugLogValue).Log(t.logger)
1040 t.cl.event.Broadcast()
1041 if t.pieceComplete(piece) {
1042 t.onPieceCompleted(piece)
1044 t.onIncompletePiece(piece)
1046 t.updatePiecePriority(piece)
1049 func (t *Torrent) numReceivedConns() (ret int) {
1050 for c := range t.conns {
1051 if c.Discovery == peerSourceIncoming {
1058 func (t *Torrent) maxHalfOpen() int {
1059 // Note that if we somehow exceed the maximum established conns, we want
1060 // the negative value to have an effect.
1061 establishedHeadroom := int64(t.maxEstablishedConns - len(t.conns))
1062 extraIncoming := int64(t.numReceivedConns() - t.maxEstablishedConns/2)
1063 // We want to allow some experimentation with new peers, and to try to
1064 // upset an oversupply of received connections.
1065 return int(min(max(5, extraIncoming)+establishedHeadroom, int64(t.cl.halfOpenLimit)))
1068 func (t *Torrent) openNewConns() {
1069 defer t.updateWantPeersEvent()
1070 for t.peers.Len() != 0 {
1074 if len(t.halfOpen) >= t.maxHalfOpen() {
1077 p := t.peers.PopMax()
1082 func (t *Torrent) getConnPieceInclination() []int {
1083 _ret := t.connPieceInclinationPool.Get()
1085 pieceInclinationsNew.Add(1)
1086 return rand.Perm(t.numPieces())
1088 pieceInclinationsReused.Add(1)
1089 return *_ret.(*[]int)
1092 func (t *Torrent) putPieceInclination(pi []int) {
1093 t.connPieceInclinationPool.Put(&pi)
1094 pieceInclinationsPut.Add(1)
1097 func (t *Torrent) updatePieceCompletion(piece int) {
1098 pcu := t.pieceCompleteUncached(piece)
1099 p := &t.pieces[piece]
1100 changed := t.completedPieces.Get(piece) != pcu.Complete || p.storageCompletionOk != pcu.Ok
1101 log.Fmsg("piece %d completion: %v", piece, pcu.Ok).AddValue(debugLogValue).Log(t.logger)
1102 p.storageCompletionOk = pcu.Ok
1103 t.completedPieces.Set(piece, pcu.Complete)
1105 // log.Printf("piece %d uncached completion: %v", piece, pcu.Complete)
1106 // log.Printf("piece %d changed: %v", piece, changed)
1108 t.pieceCompletionChanged(piece)
1112 // Non-blocking read. Client lock is not required.
1113 func (t *Torrent) readAt(b []byte, off int64) (n int, err error) {
1114 p := &t.pieces[off/t.info.PieceLength]
1115 p.waitNoPendingWrites()
1116 return p.Storage().ReadAt(b, off-p.Info().Offset())
1119 func (t *Torrent) updateAllPieceCompletions() {
1120 for i := range iter.N(t.numPieces()) {
1121 t.updatePieceCompletion(i)
1125 // Returns an error if the metadata was completed, but couldn't be set for
1126 // some reason. Blame it on the last peer to contribute.
1127 func (t *Torrent) maybeCompleteMetadata() error {
1132 if !t.haveAllMetadataPieces() {
1133 // Don't have enough metadata pieces.
1136 err := t.setInfoBytes(t.metadataBytes)
1138 t.invalidateMetadata()
1139 return fmt.Errorf("error setting info bytes: %s", err)
1141 if t.cl.config.Debug {
1142 log.Printf("%s: got metadata from peers", t)
1147 func (t *Torrent) readerPieces() (ret bitmap.Bitmap) {
1148 t.forReaderOffsetPieces(func(begin, end int) bool {
1149 ret.AddRange(begin, end)
1155 func (t *Torrent) readerPiecePriorities() (now, readahead bitmap.Bitmap) {
1156 t.forReaderOffsetPieces(func(begin, end int) bool {
1159 readahead.AddRange(begin+1, end)
1166 func (t *Torrent) needData() bool {
1167 if t.closed.IsSet() {
1173 return t.pendingPieces.Len() != 0
1176 func appendMissingStrings(old, new []string) (ret []string) {
1179 for _, n := range new {
1180 for _, o := range old {
1185 ret = append(ret, n)
1190 func appendMissingTrackerTiers(existing [][]string, minNumTiers int) (ret [][]string) {
1192 for minNumTiers > len(ret) {
1193 ret = append(ret, nil)
1198 func (t *Torrent) addTrackers(announceList [][]string) {
1199 fullAnnounceList := &t.metainfo.AnnounceList
1200 t.metainfo.AnnounceList = appendMissingTrackerTiers(*fullAnnounceList, len(announceList))
1201 for tierIndex, trackerURLs := range announceList {
1202 (*fullAnnounceList)[tierIndex] = appendMissingStrings((*fullAnnounceList)[tierIndex], trackerURLs)
1204 t.startMissingTrackerScrapers()
1205 t.updateWantPeersEvent()
1208 // Don't call this before the info is available.
1209 func (t *Torrent) bytesCompleted() int64 {
1213 return t.info.TotalLength() - t.bytesLeft()
1216 func (t *Torrent) SetInfoBytes(b []byte) (err error) {
1218 defer t.cl.mu.Unlock()
1219 return t.setInfoBytes(b)
1222 // Returns true if connection is removed from torrent.Conns.
1223 func (t *Torrent) deleteConnection(c *connection) (ret bool) {
1224 if !c.closed.IsSet() {
1225 panic("connection is not closed")
1226 // There are behaviours prevented by the closed state that will fail
1227 // if the connection has been deleted.
1231 torrent.Add("deleted connections", 1)
1232 c.deleteAllRequests()
1233 if len(t.conns) == 0 {
1234 t.assertNoPendingRequests()
1239 func (t *Torrent) assertNoPendingRequests() {
1240 if len(t.pendingRequests) != 0 {
1241 panic(t.pendingRequests)
1245 func (t *Torrent) dropConnection(c *connection) {
1246 t.cl.event.Broadcast()
1248 if t.deleteConnection(c) {
1253 func (t *Torrent) wantPeers() bool {
1254 if t.closed.IsSet() {
1257 if t.peers.Len() > t.cl.config.TorrentPeersLowWater {
1260 return t.needData() || t.seeding()
1263 func (t *Torrent) updateWantPeersEvent() {
1265 t.wantPeersEvent.Set()
1267 t.wantPeersEvent.Clear()
1271 // Returns whether the client should make effort to seed the torrent.
1272 func (t *Torrent) seeding() bool {
1274 if t.closed.IsSet() {
1277 if cl.config.NoUpload {
1280 if !cl.config.Seed {
1283 if cl.config.DisableAggressiveUpload && t.needData() {
1289 func (t *Torrent) startScrapingTracker(_url string) {
1293 u, err := url.Parse(_url)
1295 log.Str("error parsing tracker url").AddValues("url", _url).Log(t.logger)
1297 if u.Scheme == "udp" {
1299 t.startScrapingTracker(u.String())
1301 t.startScrapingTracker(u.String())
1304 if u.Scheme == "udp4" && (t.cl.config.DisableIPv4Peers || t.cl.config.DisableIPv4) {
1307 if u.Scheme == "udp6" && t.cl.config.DisableIPv6 {
1310 if _, ok := t.trackerAnnouncers[_url]; ok {
1313 newAnnouncer := &trackerScraper{
1317 if t.trackerAnnouncers == nil {
1318 t.trackerAnnouncers = make(map[string]*trackerScraper)
1320 t.trackerAnnouncers[_url] = newAnnouncer
1321 go newAnnouncer.Run()
1324 // Adds and starts tracker scrapers for tracker URLs that aren't already
1326 func (t *Torrent) startMissingTrackerScrapers() {
1327 if t.cl.config.DisableTrackers {
1330 t.startScrapingTracker(t.metainfo.Announce)
1331 for _, tier := range t.metainfo.AnnounceList {
1332 for _, url := range tier {
1333 t.startScrapingTracker(url)
1338 // Returns an AnnounceRequest with fields filled out to defaults and current
1340 func (t *Torrent) announceRequest() tracker.AnnounceRequest {
1341 // Note that IPAddress is not set. It's set for UDP inside the tracker
1342 // code, since it's dependent on the network in use.
1343 return tracker.AnnounceRequest{
1344 Event: tracker.None,
1346 Port: uint16(t.cl.incomingPeerPort()),
1347 PeerId: t.cl.peerID,
1348 InfoHash: t.infoHash,
1349 Key: t.cl.announceKey(),
1351 // The following are vaguely described in BEP 3.
1353 Left: t.bytesLeftAnnounce(),
1354 Uploaded: t.stats.BytesWrittenData.Int64(),
1355 // There's no mention of wasted or unwanted download in the BEP.
1356 Downloaded: t.stats.BytesReadUsefulData.Int64(),
1360 // Adds peers revealed in an announce until the announce ends, or we have
1362 func (t *Torrent) consumeDHTAnnounce(pvs <-chan dht.PeersValues) {
1364 // Count all the unique addresses we got during this announce.
1365 allAddrs := make(map[string]struct{})
1368 case v, ok := <-pvs:
1372 addPeers := make([]Peer, 0, len(v.Peers))
1373 for _, cp := range v.Peers {
1375 // Can't do anything with this.
1378 addPeers = append(addPeers, Peer{
1381 Source: peerSourceDHTGetPeers,
1383 key := (&net.UDPAddr{
1387 allAddrs[key] = struct{}{}
1390 t.addPeers(addPeers)
1391 numPeers := t.peers.Len()
1393 if numPeers >= cl.config.TorrentPeersHighWater {
1396 case <-t.closed.LockedChan(&cl.mu):
1402 func (t *Torrent) announceDHT(impliedPort bool, s *dht.Server) (err error) {
1404 ps, err := s.Announce(t.infoHash, cl.incomingPeerPort(), impliedPort)
1408 t.consumeDHTAnnounce(ps.Peers)
1413 func (t *Torrent) dhtAnnouncer(s *dht.Server) {
1417 case <-t.wantPeersEvent.LockedChan(&cl.mu):
1418 case <-t.closed.LockedChan(&cl.mu):
1421 err := t.announceDHT(true, s)
1424 defer cl.mu.Unlock()
1428 log.Printf("error announcing %q to DHT: %s", t, err)
1432 case <-t.closed.LockedChan(&cl.mu):
1434 case <-time.After(5 * time.Minute):
1439 func (t *Torrent) addPeers(peers []Peer) {
1440 for _, p := range peers {
1445 func (t *Torrent) Stats() TorrentStats {
1447 defer t.cl.mu.RUnlock()
1448 return t.statsLocked()
1451 func (t *Torrent) statsLocked() (ret TorrentStats) {
1452 ret.ActivePeers = len(t.conns)
1453 ret.HalfOpenPeers = len(t.halfOpen)
1454 ret.PendingPeers = t.peers.Len()
1455 ret.TotalPeers = t.numTotalPeers()
1456 ret.ConnectedSeeders = 0
1457 for c := range t.conns {
1458 if all, ok := c.peerHasAllPieces(); all && ok {
1459 ret.ConnectedSeeders++
1462 ret.ConnStats = t.stats.Copy()
1466 // The total number of peers in the torrent.
1467 func (t *Torrent) numTotalPeers() int {
1468 peers := make(map[string]struct{})
1469 for conn := range t.conns {
1470 ra := conn.conn.RemoteAddr()
1472 // It's been closed and doesn't support RemoteAddr.
1475 peers[ra.String()] = struct{}{}
1477 for addr := range t.halfOpen {
1478 peers[addr] = struct{}{}
1480 t.peers.Each(func(peer Peer) {
1481 peers[fmt.Sprintf("%s:%d", peer.IP, peer.Port)] = struct{}{}
1486 // Reconcile bytes transferred before connection was associated with a
1488 func (t *Torrent) reconcileHandshakeStats(c *connection) {
1489 if c.stats != (ConnStats{
1490 // Handshakes should only increment these fields:
1491 BytesWritten: c.stats.BytesWritten,
1492 BytesRead: c.stats.BytesRead,
1496 c.postHandshakeStats(func(cs *ConnStats) {
1497 cs.BytesRead.Add(c.stats.BytesRead.Int64())
1498 cs.BytesWritten.Add(c.stats.BytesWritten.Int64())
1500 c.reconciledHandshakeStats = true
1503 // Returns true if the connection is added.
1504 func (t *Torrent) addConnection(c *connection) (err error) {
1507 torrent.Add("added connections", 1)
1510 if t.closed.IsSet() {
1511 return errors.New("torrent closed")
1513 for c0 := range t.conns {
1514 if c.PeerID != c0.PeerID {
1517 if !t.cl.config.dropDuplicatePeerIds {
1520 if left, ok := c.hasPreferredNetworkOver(c0); ok && left {
1522 t.deleteConnection(c0)
1524 return errors.New("existing connection preferred")
1528 return errors.New("don't want conns")
1530 if len(t.conns) >= t.maxEstablishedConns {
1531 c := t.worstBadConn()
1532 if t.cl.config.Debug && missinggo.CryHeard() {
1533 log.Printf("%s: dropping connection to make room for new one:\n %v", t, c)
1536 t.deleteConnection(c)
1538 if len(t.conns) >= t.maxEstablishedConns {
1541 t.conns[c] = struct{}{}
1545 func (t *Torrent) wantConns() bool {
1546 if !t.networkingEnabled {
1549 if t.closed.IsSet() {
1552 if !t.seeding() && !t.needData() {
1555 if len(t.conns) < t.maxEstablishedConns {
1558 return t.worstBadConn() != nil
1561 func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
1563 defer t.cl.mu.Unlock()
1564 oldMax = t.maxEstablishedConns
1565 t.maxEstablishedConns = max
1566 wcs := slices.HeapInterface(slices.FromMapKeys(t.conns), worseConn)
1567 for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 {
1568 t.dropConnection(wcs.Pop().(*connection))
1574 func (t *Torrent) mu() missinggo.RWLocker {
1578 func (t *Torrent) pieceHashed(piece int, correct bool) {
1579 log.Fmsg("hashed piece %d", piece).Add("piece", piece).Add("passed", correct).AddValue(debugLogValue).Log(t.logger)
1580 if t.closed.IsSet() {
1583 p := &t.pieces[piece]
1584 touchers := t.reapPieceTouchers(piece)
1585 if p.storageCompletionOk {
1586 // Don't score the first time a piece is hashed, it could be an
1589 pieceHashedCorrect.Add(1)
1591 log.Printf("%s: piece %d (%s) failed hash: %d connections contributed", t, piece, p.hash, len(touchers))
1592 pieceHashedNotCorrect.Add(1)
1596 if len(touchers) != 0 {
1597 // Don't increment stats above connection-level for every involved
1599 t.allStats((*ConnStats).incrementPiecesDirtiedGood)
1601 for _, c := range touchers {
1602 c.stats.incrementPiecesDirtiedGood()
1604 err := p.Storage().MarkComplete()
1606 log.Printf("%T: error marking piece complete %d: %s", t.storage, piece, err)
1609 if len(touchers) != 0 {
1610 // Don't increment stats above connection-level for every involved
1612 t.allStats((*ConnStats).incrementPiecesDirtiedBad)
1613 for _, c := range touchers {
1614 // Y u do dis peer?!
1615 c.stats.incrementPiecesDirtiedBad()
1617 slices.Sort(touchers, connLessTrusted)
1618 if t.cl.config.Debug {
1619 log.Printf("dropping first corresponding conn from trust: %v", func() (ret []int64) {
1620 for _, c := range touchers {
1621 ret = append(ret, c.netGoodPiecesDirtied())
1627 t.cl.banPeerIP(missinggo.AddrIP(c.remoteAddr()))
1630 t.onIncompletePiece(piece)
1631 p.Storage().MarkNotComplete()
1633 t.updatePieceCompletion(piece)
1636 func (t *Torrent) cancelRequestsForPiece(piece int) {
1637 // TODO: Make faster
1638 for cn := range t.conns {
1643 func (t *Torrent) onPieceCompleted(piece int) {
1644 t.pendAllChunkSpecs(piece)
1645 t.cancelRequestsForPiece(piece)
1646 for conn := range t.conns {
1651 // Called when a piece is found to be not complete.
1652 func (t *Torrent) onIncompletePiece(piece int) {
1653 if t.pieceAllDirty(piece) {
1654 t.pendAllChunkSpecs(piece)
1656 if !t.wantPieceIndex(piece) {
1657 // log.Printf("piece %d incomplete and unwanted", piece)
1660 // We could drop any connections that we told we have a piece that we
1661 // don't here. But there's a test failure, and it seems clients don't care
1662 // if you request pieces that you already claim to have. Pruning bad
1663 // connections might just remove any connections that aren't treating us
1664 // favourably anyway.
1666 // for c := range t.conns {
1667 // if c.sentHave(piece) {
1671 for conn := range t.conns {
1672 if conn.PeerHasPiece(piece) {
1673 conn.updateRequests()
1678 func (t *Torrent) verifyPiece(piece int) {
1681 defer cl.mu.Unlock()
1682 p := &t.pieces[piece]
1685 cl.event.Broadcast()
1687 for p.hashing || t.storage == nil {
1690 if !p.t.piecesQueuedForHash.Remove(piece) {
1691 panic("piece was not queued")
1693 if t.closed.IsSet() || t.pieceComplete(piece) {
1694 t.updatePiecePriority(piece)
1698 t.publishPieceChange(piece)
1699 t.storageLock.RLock()
1701 sum := t.hashPiece(piece)
1702 t.storageLock.RUnlock()
1705 t.pieceHashed(piece, sum == p.hash)
1706 t.publishPieceChange(piece)
1709 // Return the connections that touched a piece, and clear the entries while
1711 func (t *Torrent) reapPieceTouchers(piece int) (ret []*connection) {
1712 for c := range t.pieces[piece].dirtiers {
1713 delete(c.peerTouchedPieces, piece)
1714 ret = append(ret, c)
1716 t.pieces[piece].dirtiers = nil
1720 func (t *Torrent) connsAsSlice() (ret []*connection) {
1721 for c := range t.conns {
1722 ret = append(ret, c)
1727 // Currently doesn't really queue, but should in the future.
1728 func (t *Torrent) queuePieceCheck(pieceIndex int) {
1729 piece := &t.pieces[pieceIndex]
1730 if piece.queuedForHash() {
1733 t.piecesQueuedForHash.Add(pieceIndex)
1734 t.publishPieceChange(pieceIndex)
1735 go t.verifyPiece(pieceIndex)
1738 func (t *Torrent) VerifyData() {
1739 for i := range iter.N(t.NumPieces()) {
1740 t.Piece(i).VerifyData()
1744 // Start the process of connecting to the given peer for the given torrent if
1746 func (t *Torrent) initiateConn(peer Peer) {
1747 if peer.Id == t.cl.peerID {
1750 if t.cl.badPeerIPPort(peer.IP, peer.Port) {
1753 addr := net.JoinHostPort(peer.IP.String(), fmt.Sprintf("%d", peer.Port))
1754 if t.addrActive(addr) {
1757 t.halfOpen[addr] = peer
1758 go t.cl.outgoingConnection(t, addr, peer.Source)
1761 func (t *Torrent) AddClientPeer(cl *Client) {
1762 t.AddPeers(func() (ps []Peer) {
1763 for _, la := range cl.ListenAddrs() {
1764 ps = append(ps, Peer{
1765 IP: missinggo.AddrIP(la),
1766 Port: missinggo.AddrPort(la),
1773 // All stats that include this Torrent. Useful when we want to increment
1774 // ConnStats but not for every connection.
1775 func (t *Torrent) allStats(f func(*ConnStats)) {