19 "github.com/anacrolix/dht"
20 "github.com/anacrolix/missinggo"
21 "github.com/anacrolix/missinggo/bitmap"
22 "github.com/anacrolix/missinggo/perf"
23 "github.com/anacrolix/missinggo/pubsub"
24 "github.com/anacrolix/missinggo/slices"
25 "github.com/bradfitz/iter"
27 "github.com/anacrolix/torrent/bencode"
28 "github.com/anacrolix/torrent/metainfo"
29 pp "github.com/anacrolix/torrent/peer_protocol"
30 "github.com/anacrolix/torrent/storage"
31 "github.com/anacrolix/torrent/tracker"
34 func (t *Torrent) chunkIndexSpec(chunkIndex, piece int) chunkSpec {
35 return chunkIndexSpec(chunkIndex, t.pieceLength(piece), t.chunkSize)
38 type peersKey struct {
43 // Maintains state of torrent within a Client.
47 networkingEnabled bool
50 closed missinggo.Event
51 infoHash metainfo.Hash
53 // Values are the piece indices that changed.
54 pieceStateChanges *pubsub.PubSub
55 // The size of chunks to request from peers over the wire. This is
56 // normally 16KiB by convention these days.
59 // Total length of the torrent in bytes. Stored because it's not O(1) to
60 // get this from the info dict.
63 // The storage to open when the info dict becomes available.
64 storageOpener *storage.Client
65 // Storage for torrent data.
66 storage *storage.Torrent
68 metainfo metainfo.MetaInfo
70 // The info dict. nil if we don't have it (yet).
73 // Active peer connections, running message stream loops.
74 conns map[*connection]struct{}
75 maxEstablishedConns int
76 // Set of addrs to which we're attempting to connect. Connections are
77 // half-open until all handshakes are completed.
78 halfOpen map[string]Peer
79 fastestConn *connection
81 // Reserve of peers to connect to. A peer can be both here and in the
82 // active connections if were told about the peer after connecting with
83 // them. That encourages us to reconnect to peers that are well known in
85 peers map[peersKey]Peer
86 wantPeersEvent missinggo.Event
87 // An announcer for each tracker URL.
88 trackerAnnouncers map[string]*trackerScraper
89 // How many times we've initiated a DHT announce. TODO: Move into stats.
92 // Name used if the info name isn't available. Should be cleared when the
93 // Info does become available.
95 // The bencoded bytes of the info dict. This is actively manipulated if
96 // the info bytes aren't initially available, and we try to fetch them
99 // Each element corresponds to the 16KiB metadata pieces. If true, we have
100 // received that piece.
101 metadataCompletedChunks []bool
103 // Set when .Info is obtained.
104 gotMetainfo missinggo.Event
106 readers map[*Reader]struct{}
107 readerNowPieces bitmap.Bitmap
108 readerReadaheadPieces bitmap.Bitmap
110 // The indexes of pieces we want with normal priority, that aren't
111 // currently available.
112 pendingPieces bitmap.Bitmap
113 // A cache of completed piece indices.
114 completedPieces bitmap.Bitmap
115 // Pieces that need to be hashed.
116 piecesQueuedForHash bitmap.Bitmap
118 // A pool of piece priorities []int for assignment to new connections.
119 // These "inclinations" are used to give connections preference for
121 connPieceInclinationPool sync.Pool
122 // Torrent-level statistics.
126 // Returns a channel that is closed when the Torrent is closed.
127 func (t *Torrent) Closed() <-chan struct{} {
128 return t.closed.LockedChan(&t.cl.mu)
131 // KnownSwarm returns the known subset of the peers in the Torrent's swarm, including active,
132 // pending, and half-open peers.
133 func (t *Torrent) KnownSwarm() (ks []Peer) {
134 // Add pending peers to the list
135 for _, peer := range t.peers {
136 ks = append(ks, peer)
139 // Add half-open peers to the list
140 for _, peer := range t.halfOpen {
141 ks = append(ks, peer)
144 // Add active peers to the list
145 for conn := range t.conns {
146 host, portString, err := net.SplitHostPort(conn.remoteAddr().String())
151 ip := net.ParseIP(host)
152 port, err := strconv.Atoi(portString)
157 ks = append(ks, Peer{
161 Source: conn.Discovery,
162 // > If the connection is encrypted, that's certainly enough to set SupportsEncryption.
163 // > But if we're not connected to them with an encrypted connection, I couldn't say
164 // > what's appropriate. We can carry forward the SupportsEncryption value as we
165 // > received it from trackers/DHT/PEX, or just use the encryption state for the
166 // > connection. It's probably easiest to do the latter for now.
167 // https://github.com/anacrolix/torrent/pull/188
168 SupportsEncryption: conn.headerEncrypted,
175 func (t *Torrent) setChunkSize(size pp.Integer) {
177 t.chunkPool = &sync.Pool{
178 New: func() interface{} {
179 return make([]byte, size)
184 func (t *Torrent) setDisplayName(dn string) {
191 func (t *Torrent) pieceComplete(piece int) bool {
192 return t.completedPieces.Get(piece)
195 func (t *Torrent) pieceCompleteUncached(piece int) storage.Completion {
196 return t.pieces[piece].Storage().Completion()
199 // There's a connection to that address already.
200 func (t *Torrent) addrActive(addr string) bool {
201 if _, ok := t.halfOpen[addr]; ok {
204 for c := range t.conns {
205 if c.remoteAddr().String() == addr {
212 func (t *Torrent) unclosedConnsAsSlice() (ret []*connection) {
213 ret = make([]*connection, 0, len(t.conns))
214 for c := range t.conns {
215 if !c.closed.IsSet() {
222 func (t *Torrent) addPeer(p Peer) {
225 if len(t.peers) >= torrentPeersHighWater {
228 key := peersKey{string(p.IP), p.Port}
229 if _, ok := t.peers[key]; ok {
233 peersAddedBySource.Add(string(p.Source), 1)
238 func (t *Torrent) invalidateMetadata() {
239 for i := range t.metadataCompletedChunks {
240 t.metadataCompletedChunks[i] = false
245 func (t *Torrent) saveMetadataPiece(index int, data []byte) {
249 if index >= len(t.metadataCompletedChunks) {
250 log.Printf("%s: ignoring metadata piece %d", t, index)
253 copy(t.metadataBytes[(1<<14)*index:], data)
254 t.metadataCompletedChunks[index] = true
257 func (t *Torrent) metadataPieceCount() int {
258 return (len(t.metadataBytes) + (1 << 14) - 1) / (1 << 14)
261 func (t *Torrent) haveMetadataPiece(piece int) bool {
263 return (1<<14)*piece < len(t.metadataBytes)
265 return piece < len(t.metadataCompletedChunks) && t.metadataCompletedChunks[piece]
269 func (t *Torrent) metadataSizeKnown() bool {
270 return t.metadataBytes != nil
273 func (t *Torrent) metadataSize() int {
274 return len(t.metadataBytes)
277 func infoPieceHashes(info *metainfo.Info) (ret []string) {
278 for i := 0; i < len(info.Pieces); i += sha1.Size {
279 ret = append(ret, string(info.Pieces[i:i+sha1.Size]))
284 func (t *Torrent) makePieces() {
285 hashes := infoPieceHashes(t.info)
286 t.pieces = make([]Piece, len(hashes))
287 for i, hash := range hashes {
288 piece := &t.pieces[i]
291 piece.noPendingWrites.L = &piece.pendingWritesMutex
292 missinggo.CopyExact(piece.hash[:], hash)
296 // Called when metadata for a torrent becomes available.
297 func (t *Torrent) setInfoBytes(b []byte) error {
301 if metainfo.HashBytes(b) != t.infoHash {
302 return errors.New("info bytes have wrong hash")
304 var info metainfo.Info
305 err := bencode.Unmarshal(b, &info)
307 return fmt.Errorf("error unmarshalling info bytes: %s", err)
309 err = validateInfo(&info)
311 return fmt.Errorf("bad info: %s", err)
313 defer t.updateWantPeersEvent()
315 t.displayName = "" // Save a few bytes lol.
316 t.cl.event.Broadcast()
318 t.storage, err = t.storageOpener.OpenTorrent(t.info, t.infoHash)
320 return fmt.Errorf("error opening torrent storage: %s", err)
323 for _, f := range t.info.UpvertedFiles() {
327 t.metadataCompletedChunks = nil
329 for conn := range t.conns {
330 if err := conn.setNumPieces(t.numPieces()); err != nil {
331 log.Printf("closing connection: %s", err)
335 for i := range t.pieces {
336 t.updatePieceCompletion(i)
338 if !p.storageCompletionOk {
339 log.Printf("piece %s completion unknown, queueing check", p)
346 func (t *Torrent) haveAllMetadataPieces() bool {
350 if t.metadataCompletedChunks == nil {
353 for _, have := range t.metadataCompletedChunks {
361 // TODO: Propagate errors to disconnect peer.
362 func (t *Torrent) setMetadataSize(bytes int64) (err error) {
364 // We already know the correct metadata size.
367 if bytes <= 0 || bytes > 10000000 { // 10MB, pulled from my ass.
368 return errors.New("bad size")
370 if t.metadataBytes != nil && len(t.metadataBytes) == int(bytes) {
373 t.metadataBytes = make([]byte, bytes)
374 t.metadataCompletedChunks = make([]bool, (bytes+(1<<14)-1)/(1<<14))
375 for c := range t.conns {
376 c.requestPendingMetadata()
381 // The current working name for the torrent. Either the name in the info dict,
382 // or a display name given such as by the dn value in a magnet link, or "".
383 func (t *Torrent) name() string {
390 func (t *Torrent) pieceState(index int) (ret PieceState) {
391 p := &t.pieces[index]
392 ret.Priority = t.piecePriority(index)
393 if t.pieceComplete(index) {
396 if p.queuedForHash() || p.hashing {
399 if !ret.Complete && t.piecePartiallyDownloaded(index) {
405 func (t *Torrent) metadataPieceSize(piece int) int {
406 return metadataPieceSize(len(t.metadataBytes), piece)
409 func (t *Torrent) newMetadataExtensionMessage(c *connection, msgType int, piece int, data []byte) pp.Message {
415 d["total_size"] = len(t.metadataBytes)
417 p, err := bencode.Marshal(d)
423 ExtendedID: c.PeerExtensionIDs["ut_metadata"],
424 ExtendedPayload: append(p, data...),
428 func (t *Torrent) pieceStateRuns() (ret []PieceStateRun) {
429 rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
430 ret = append(ret, PieceStateRun{
431 PieceState: el.(PieceState),
435 for index := range t.pieces {
436 rle.Append(t.pieceState(index), 1)
442 // Produces a small string representing a PieceStateRun.
443 func pieceStateRunStatusChars(psr PieceStateRun) (ret string) {
444 ret = fmt.Sprintf("%d", psr.Length)
445 ret += func() string {
446 switch psr.Priority {
447 case PiecePriorityNext:
449 case PiecePriorityNormal:
451 case PiecePriorityReadahead:
453 case PiecePriorityNow:
471 func (t *Torrent) writeStatus(w io.Writer) {
472 fmt.Fprintf(w, "Infohash: %s\n", t.infoHash.HexString())
473 fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
475 fmt.Fprintf(w, "Metadata have: ")
476 for _, h := range t.metadataCompletedChunks {
477 fmt.Fprintf(w, "%c", func() rune {
487 fmt.Fprintf(w, "Piece length: %s\n", func() string {
489 return fmt.Sprint(t.usualPieceSize())
495 fmt.Fprintf(w, "Num Pieces: %d\n", t.numPieces())
496 fmt.Fprint(w, "Piece States:")
497 for _, psr := range t.pieceStateRuns() {
499 w.Write([]byte(pieceStateRunStatusChars(psr)))
503 fmt.Fprintf(w, "Reader Pieces:")
504 t.forReaderOffsetPieces(func(begin, end int) (again bool) {
505 fmt.Fprintf(w, " %d:%d", begin, end)
510 fmt.Fprintf(w, "Trackers:\n")
512 tw := tabwriter.NewWriter(w, 0, 0, 2, ' ', 0)
513 fmt.Fprintf(tw, " URL\tNext announce\tLast announce\n")
514 for _, ta := range slices.Sort(slices.FromMapElems(t.trackerAnnouncers), func(l, r *trackerScraper) bool {
516 }).([]*trackerScraper) {
517 fmt.Fprintf(tw, " %s\n", ta.statusLine())
522 fmt.Fprintf(w, "DHT Announces: %d\n", t.numDHTAnnounces)
524 fmt.Fprintf(w, "Pending peers: %d\n", len(t.peers))
525 fmt.Fprintf(w, "Half open: %d\n", len(t.halfOpen))
526 fmt.Fprintf(w, "Active peers: %d\n", len(t.conns))
527 conns := t.connsAsSlice()
528 slices.Sort(conns, worseConn)
529 for i, c := range conns {
530 fmt.Fprintf(w, "%2d. ", i+1)
535 func (t *Torrent) haveInfo() bool {
539 // Returns a run-time generated MetaInfo that includes the info bytes and
540 // announce-list as currently known to the client.
541 func (t *Torrent) newMetaInfo() metainfo.MetaInfo {
542 return metainfo.MetaInfo{
543 CreationDate: time.Now().Unix(),
544 Comment: "dynamic metainfo from client",
545 CreatedBy: "go.torrent",
546 AnnounceList: t.metainfo.UpvertedAnnounceList(),
547 InfoBytes: t.metadataBytes,
551 func (t *Torrent) BytesMissing() int64 {
553 defer t.mu().RUnlock()
554 return t.bytesMissingLocked()
557 func (t *Torrent) bytesMissingLocked() int64 {
561 func (t *Torrent) bytesLeft() (left int64) {
562 bitmap.Flip(t.completedPieces, 0, t.numPieces()).IterTyped(func(piece int) bool {
564 left += int64(p.length() - p.numDirtyBytes())
570 // Bytes left to give in tracker announces.
571 func (t *Torrent) bytesLeftAnnounce() uint64 {
573 return uint64(t.bytesLeft())
575 return math.MaxUint64
579 func (t *Torrent) piecePartiallyDownloaded(piece int) bool {
580 if t.pieceComplete(piece) {
583 if t.pieceAllDirty(piece) {
586 return t.pieces[piece].hasDirtyChunks()
589 func (t *Torrent) usualPieceSize() int {
590 return int(t.info.PieceLength)
593 func (t *Torrent) numPieces() int {
594 return t.info.NumPieces()
597 func (t *Torrent) numPiecesCompleted() (num int) {
598 return t.completedPieces.Len()
601 func (t *Torrent) close() (err error) {
603 if t.storage != nil {
606 for conn := range t.conns {
609 t.cl.event.Broadcast()
610 t.pieceStateChanges.Close()
611 t.updateWantPeersEvent()
615 func (t *Torrent) requestOffset(r request) int64 {
616 return torrentRequestOffset(t.length, int64(t.usualPieceSize()), r)
619 // Return the request that would include the given offset into the torrent
620 // data. Returns !ok if there is no such request.
621 func (t *Torrent) offsetRequest(off int64) (req request, ok bool) {
622 return torrentOffsetRequest(t.length, t.info.PieceLength, int64(t.chunkSize), off)
625 func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
626 tr := perf.NewTimer()
628 n, err := t.pieces[piece].Storage().WriteAt(data, begin)
629 if err == nil && n != len(data) {
630 err = io.ErrShortWrite
633 tr.Mark("write chunk")
638 func (t *Torrent) bitfield() (bf []bool) {
639 bf = make([]bool, t.numPieces())
640 t.completedPieces.IterTyped(func(piece int) (again bool) {
647 func (t *Torrent) pieceNumChunks(piece int) int {
648 return int((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize)
651 func (t *Torrent) pendAllChunkSpecs(pieceIndex int) {
652 t.pieces[pieceIndex].dirtyChunks.Clear()
660 // Peer is known to support encryption.
661 SupportsEncryption bool
664 func (t *Torrent) pieceLength(piece int) pp.Integer {
665 if piece == t.numPieces()-1 {
666 ret := pp.Integer(t.length % t.info.PieceLength)
671 return pp.Integer(t.info.PieceLength)
674 func (t *Torrent) hashPiece(piece int) (ret metainfo.Hash) {
675 hash := pieceHash.New()
676 p := &t.pieces[piece]
677 p.waitNoPendingWrites()
678 ip := t.info.Piece(piece)
680 n, err := io.Copy(hash, io.NewSectionReader(t.pieces[piece].Storage(), 0, pl))
682 missinggo.CopyExact(&ret, hash.Sum(nil))
685 if err != io.ErrUnexpectedEOF && !os.IsNotExist(err) {
686 log.Printf("unexpected error hashing piece with %T: %s", t.storage.TorrentImpl, err)
691 func (t *Torrent) haveAnyPieces() bool {
692 for i := range t.pieces {
693 if t.pieceComplete(i) {
700 func (t *Torrent) havePiece(index int) bool {
701 return t.haveInfo() && t.pieceComplete(index)
704 func (t *Torrent) haveChunk(r request) (ret bool) {
706 // log.Println("have chunk", r, ret)
711 if t.pieceComplete(int(r.Index)) {
714 p := &t.pieces[r.Index]
715 return !p.pendingChunk(r.chunkSpec, t.chunkSize)
718 func chunkIndex(cs chunkSpec, chunkSize pp.Integer) int {
719 return int(cs.Begin / chunkSize)
722 func (t *Torrent) wantPiece(r request) bool {
723 if !t.wantPieceIndex(int(r.Index)) {
726 if t.pieces[r.Index].pendingChunk(r.chunkSpec, t.chunkSize) {
729 // TODO: What about pieces that were wanted, but aren't now, and aren't
730 // completed either? That used to be done here.
734 func (t *Torrent) wantPieceIndex(index int) bool {
738 if index < 0 || index >= t.numPieces() {
741 p := &t.pieces[index]
742 if p.queuedForHash() {
748 if t.pieceComplete(index) {
751 if t.pendingPieces.Contains(index) {
754 return !t.forReaderOffsetPieces(func(begin, end int) bool {
755 return index < begin || index >= end
759 // The worst connection is one that hasn't been sent, or sent anything useful
760 // for the longest. A bad connection is one that usually sends us unwanted
761 // pieces, or has been in worser half of the established connections for more
763 func (t *Torrent) worstBadConn() *connection {
764 wcs := worseConnSlice{t.unclosedConnsAsSlice()}
767 c := heap.Pop(&wcs).(*connection)
768 if c.UnwantedChunksReceived >= 6 && c.UnwantedChunksReceived > c.UsefulChunksReceived {
771 if wcs.Len() >= (t.maxEstablishedConns+1)/2 {
772 // Give connections 1 minute to prove themselves.
773 if time.Since(c.completedHandshake) > time.Minute {
781 type PieceStateChange struct {
786 func (t *Torrent) publishPieceChange(piece int) {
787 cur := t.pieceState(piece)
788 p := &t.pieces[piece]
789 if cur != p.publicPieceState {
790 p.publicPieceState = cur
791 t.pieceStateChanges.Publish(PieceStateChange{
798 func (t *Torrent) pieceNumPendingChunks(piece int) int {
799 if t.pieceComplete(piece) {
802 return t.pieceNumChunks(piece) - t.pieces[piece].numDirtyChunks()
805 func (t *Torrent) pieceAllDirty(piece int) bool {
806 return t.pieces[piece].dirtyChunks.Len() == t.pieceNumChunks(piece)
809 func (t *Torrent) readersChanged() {
810 t.updateReaderPieces()
811 t.updateAllPiecePriorities()
814 func (t *Torrent) updateReaderPieces() {
815 t.readerNowPieces, t.readerReadaheadPieces = t.readerPiecePriorities()
818 func (t *Torrent) readerPosChanged(from, to pieceRange) {
822 t.updateReaderPieces()
823 // Order the ranges, high and low.
825 if l.begin > h.begin {
829 // Two distinct ranges.
830 t.updatePiecePriorities(l.begin, l.end)
831 t.updatePiecePriorities(h.begin, h.end)
838 t.updatePiecePriorities(l.begin, end)
842 func (t *Torrent) maybeNewConns() {
843 // Tickle the accept routine.
844 t.cl.event.Broadcast()
848 func (t *Torrent) piecePriorityChanged(piece int) {
849 for c := range t.conns {
850 if c.updatePiecePriority(piece) {
855 t.publishPieceChange(piece)
858 func (t *Torrent) updatePiecePriority(piece int) {
859 p := &t.pieces[piece]
860 newPrio := t.piecePriorityUncached(piece)
861 if newPrio == p.priority {
865 t.piecePriorityChanged(piece)
868 func (t *Torrent) updateAllPiecePriorities() {
869 t.updatePiecePriorities(0, len(t.pieces))
872 // Update all piece priorities in one hit. This function should have the same
873 // output as updatePiecePriority, but across all pieces.
874 func (t *Torrent) updatePiecePriorities(begin, end int) {
875 for i := begin; i < end; i++ {
876 t.updatePiecePriority(i)
880 // Returns the range of pieces [begin, end) that contains the extent of bytes.
881 func (t *Torrent) byteRegionPieces(off, size int64) (begin, end int) {
892 begin = int(off / t.info.PieceLength)
893 end = int((off + size + t.info.PieceLength - 1) / t.info.PieceLength)
894 if end > t.info.NumPieces() {
895 end = t.info.NumPieces()
900 // Returns true if all iterations complete without breaking. Returns the read
901 // regions for all readers. The reader regions should not be merged as some
902 // callers depend on this method to enumerate readers.
903 func (t *Torrent) forReaderOffsetPieces(f func(begin, end int) (more bool)) (all bool) {
904 for r := range t.readers {
906 if p.begin >= p.end {
909 if !f(p.begin, p.end) {
916 func (t *Torrent) piecePriority(piece int) piecePriority {
918 return PiecePriorityNone
920 return t.pieces[piece].priority
923 func (t *Torrent) piecePriorityUncached(piece int) piecePriority {
924 if t.pieceComplete(piece) {
925 return PiecePriorityNone
927 if t.readerNowPieces.Contains(piece) {
928 return PiecePriorityNow
930 // if t.readerNowPieces.Contains(piece - 1) {
931 // return PiecePriorityNext
933 if t.readerReadaheadPieces.Contains(piece) {
934 return PiecePriorityReadahead
936 if t.pendingPieces.Contains(piece) {
937 return PiecePriorityNormal
939 return PiecePriorityNone
942 func (t *Torrent) pendPiece(piece int) {
943 if t.pendingPieces.Contains(piece) {
946 if t.havePiece(piece) {
949 t.pendingPieces.Add(piece)
950 t.updatePiecePriority(piece)
953 func (t *Torrent) unpendPieces(unpend bitmap.Bitmap) {
954 t.pendingPieces.Sub(unpend)
955 unpend.IterTyped(func(piece int) (again bool) {
956 t.updatePiecePriority(piece)
961 func (t *Torrent) pendPieceRange(begin, end int) {
962 for i := begin; i < end; i++ {
967 func (t *Torrent) unpendPieceRange(begin, end int) {
969 bm.AddRange(begin, end)
973 func (t *Torrent) pendRequest(req request) {
974 ci := chunkIndex(req.chunkSpec, t.chunkSize)
975 t.pieces[req.Index].pendChunkIndex(ci)
978 func (t *Torrent) pieceCompletionChanged(piece int) {
979 t.cl.event.Broadcast()
980 if t.pieceComplete(piece) {
981 t.onPieceCompleted(piece)
983 t.onIncompletePiece(piece)
985 t.updatePiecePriority(piece)
988 func (t *Torrent) openNewConns() {
992 func (t *Torrent) getConnPieceInclination() []int {
993 _ret := t.connPieceInclinationPool.Get()
995 pieceInclinationsNew.Add(1)
996 return rand.Perm(t.numPieces())
998 pieceInclinationsReused.Add(1)
1002 func (t *Torrent) putPieceInclination(pi []int) {
1003 t.connPieceInclinationPool.Put(pi)
1004 pieceInclinationsPut.Add(1)
1007 func (t *Torrent) updatePieceCompletion(piece int) {
1008 pcu := t.pieceCompleteUncached(piece)
1009 p := &t.pieces[piece]
1010 changed := t.completedPieces.Get(piece) != pcu.Complete || p.storageCompletionOk != pcu.Ok
1011 p.storageCompletionOk = pcu.Ok
1012 t.completedPieces.Set(piece, pcu.Complete)
1014 t.pieceCompletionChanged(piece)
1018 // Non-blocking read. Client lock is not required.
1019 func (t *Torrent) readAt(b []byte, off int64) (n int, err error) {
1020 p := &t.pieces[off/t.info.PieceLength]
1021 p.waitNoPendingWrites()
1022 return p.Storage().ReadAt(b, off-p.Info().Offset())
1025 func (t *Torrent) updateAllPieceCompletions() {
1026 for i := range iter.N(t.numPieces()) {
1027 t.updatePieceCompletion(i)
1031 // Returns an error if the metadata was completed, but couldn't be set for
1032 // some reason. Blame it on the last peer to contribute.
1033 func (t *Torrent) maybeCompleteMetadata() error {
1038 if !t.haveAllMetadataPieces() {
1039 // Don't have enough metadata pieces.
1042 err := t.setInfoBytes(t.metadataBytes)
1044 t.invalidateMetadata()
1045 return fmt.Errorf("error setting info bytes: %s", err)
1047 if t.cl.config.Debug {
1048 log.Printf("%s: got metadata from peers", t)
1053 func (t *Torrent) readerPieces() (ret bitmap.Bitmap) {
1054 t.forReaderOffsetPieces(func(begin, end int) bool {
1055 ret.AddRange(begin, end)
1061 func (t *Torrent) readerPiecePriorities() (now, readahead bitmap.Bitmap) {
1062 t.forReaderOffsetPieces(func(begin, end int) bool {
1065 readahead.AddRange(begin+1, end)
1072 func (t *Torrent) needData() bool {
1073 if t.closed.IsSet() {
1079 if t.pendingPieces.Len() != 0 {
1082 // Read as "not all complete".
1083 return !t.readerPieces().IterTyped(func(piece int) bool {
1084 return t.pieceComplete(piece)
1088 func appendMissingStrings(old, new []string) (ret []string) {
1091 for _, n := range new {
1092 for _, o := range old {
1097 ret = append(ret, n)
1102 func appendMissingTrackerTiers(existing [][]string, minNumTiers int) (ret [][]string) {
1104 for minNumTiers > len(ret) {
1105 ret = append(ret, nil)
1110 func (t *Torrent) addTrackers(announceList [][]string) {
1111 fullAnnounceList := &t.metainfo.AnnounceList
1112 t.metainfo.AnnounceList = appendMissingTrackerTiers(*fullAnnounceList, len(announceList))
1113 for tierIndex, trackerURLs := range announceList {
1114 (*fullAnnounceList)[tierIndex] = appendMissingStrings((*fullAnnounceList)[tierIndex], trackerURLs)
1116 t.startMissingTrackerScrapers()
1117 t.updateWantPeersEvent()
1120 // Don't call this before the info is available.
1121 func (t *Torrent) bytesCompleted() int64 {
1125 return t.info.TotalLength() - t.bytesLeft()
1128 func (t *Torrent) SetInfoBytes(b []byte) (err error) {
1130 defer t.cl.mu.Unlock()
1131 return t.setInfoBytes(b)
1134 // Returns true if connection is removed from torrent.Conns.
1135 func (t *Torrent) deleteConnection(c *connection) (ret bool) {
1141 func (t *Torrent) dropConnection(c *connection) {
1142 t.cl.event.Broadcast()
1144 if t.deleteConnection(c) {
1149 func (t *Torrent) wantPeers() bool {
1150 if t.closed.IsSet() {
1153 if len(t.peers) > torrentPeersLowWater {
1156 return t.needData() || t.seeding()
1159 func (t *Torrent) updateWantPeersEvent() {
1161 t.wantPeersEvent.Set()
1163 t.wantPeersEvent.Clear()
1167 // Returns whether the client should make effort to seed the torrent.
1168 func (t *Torrent) seeding() bool {
1170 if t.closed.IsSet() {
1173 if cl.config.NoUpload {
1176 if !cl.config.Seed {
1185 func (t *Torrent) startScrapingTracker(url string) {
1189 if _, ok := t.trackerAnnouncers[url]; ok {
1192 newAnnouncer := &trackerScraper{
1196 if t.trackerAnnouncers == nil {
1197 t.trackerAnnouncers = make(map[string]*trackerScraper)
1199 t.trackerAnnouncers[url] = newAnnouncer
1200 go newAnnouncer.Run()
1203 // Adds and starts tracker scrapers for tracker URLs that aren't already
1205 func (t *Torrent) startMissingTrackerScrapers() {
1206 if t.cl.config.DisableTrackers {
1209 t.startScrapingTracker(t.metainfo.Announce)
1210 for _, tier := range t.metainfo.AnnounceList {
1211 for _, url := range tier {
1212 t.startScrapingTracker(url)
1217 // Returns an AnnounceRequest with fields filled out to defaults and current
1219 func (t *Torrent) announceRequest() tracker.AnnounceRequest {
1220 return tracker.AnnounceRequest{
1221 Event: tracker.None,
1223 Port: uint16(t.cl.incomingPeerPort()),
1224 PeerId: t.cl.peerID,
1225 InfoHash: t.infoHash,
1226 Left: t.bytesLeftAnnounce(),
1230 // Adds peers revealed in an announce until the announce ends, or we have
1232 func (t *Torrent) consumeDHTAnnounce(pvs <-chan dht.PeersValues) {
1234 // Count all the unique addresses we got during this announce.
1235 allAddrs := make(map[string]struct{})
1238 case v, ok := <-pvs:
1242 addPeers := make([]Peer, 0, len(v.Peers))
1243 for _, cp := range v.Peers {
1245 // Can't do anything with this.
1248 addPeers = append(addPeers, Peer{
1251 Source: peerSourceDHTGetPeers,
1253 key := (&net.UDPAddr{
1257 allAddrs[key] = struct{}{}
1260 t.addPeers(addPeers)
1261 numPeers := len(t.peers)
1263 if numPeers >= torrentPeersHighWater {
1266 case <-t.closed.LockedChan(&cl.mu):
1272 func (t *Torrent) announceDHT(impliedPort bool) (err error) {
1274 ps, err := cl.dHT.Announce(t.infoHash, cl.incomingPeerPort(), impliedPort)
1278 t.consumeDHTAnnounce(ps.Peers)
1283 func (t *Torrent) dhtAnnouncer() {
1287 case <-t.wantPeersEvent.LockedChan(&cl.mu):
1288 case <-t.closed.LockedChan(&cl.mu):
1291 err := t.announceDHT(true)
1294 defer cl.mu.Unlock()
1298 log.Printf("error announcing %q to DHT: %s", t, err)
1302 case <-t.closed.LockedChan(&cl.mu):
1304 case <-time.After(5 * time.Minute):
1309 func (t *Torrent) addPeers(peers []Peer) {
1310 for _, p := range peers {
1311 if t.cl.badPeerIPPort(p.IP, p.Port) {
1318 func (t *Torrent) Stats() TorrentStats {
1320 defer t.cl.mu.Unlock()
1322 t.stats.ActivePeers = len(t.conns)
1323 t.stats.HalfOpenPeers = len(t.halfOpen)
1324 t.stats.PendingPeers = len(t.peers)
1325 t.stats.TotalPeers = t.numTotalPeers()
1330 // The total number of peers in the torrent.
1331 func (t *Torrent) numTotalPeers() int {
1332 peers := make(map[string]struct{})
1333 for conn := range t.conns {
1334 peers[conn.conn.RemoteAddr().String()] = struct{}{}
1336 for addr := range t.halfOpen {
1337 peers[addr] = struct{}{}
1339 for _, peer := range t.peers {
1340 peers[fmt.Sprintf("%s:%d", peer.IP, peer.Port)] = struct{}{}
1345 // Returns true if the connection is added.
1346 func (t *Torrent) addConnection(c *connection, outgoing bool) bool {
1347 if t.cl.closed.IsSet() {
1353 for c0 := range t.conns {
1354 if c.PeerID == c0.PeerID {
1355 // Already connected to a client with that ID.
1356 duplicateClientConns.Add(1)
1357 lower := string(t.cl.peerID[:]) < string(c.PeerID[:])
1358 // Retain the connection from initiated from lower peer ID to
1360 if outgoing == lower {
1361 // Close the other one.
1363 // Is it safe to delete from the map while we're iterating
1365 t.deleteConnection(c0)
1367 // Abandon this one.
1372 if len(t.conns) >= t.maxEstablishedConns {
1373 c := t.worstBadConn()
1377 if t.cl.config.Debug && missinggo.CryHeard() {
1378 log.Printf("%s: dropping connection to make room for new one:\n %s", t, c)
1381 t.deleteConnection(c)
1383 if len(t.conns) >= t.maxEstablishedConns {
1387 panic("connection already associated with a torrent")
1389 // Reconcile bytes transferred before connection was associated with a
1391 t.stats.wroteBytes(c.stats.BytesWritten)
1392 t.stats.readBytes(c.stats.BytesRead)
1394 t.conns[c] = struct{}{}
1398 func (t *Torrent) wantConns() bool {
1399 if !t.networkingEnabled {
1402 if t.closed.IsSet() {
1405 if !t.seeding() && !t.needData() {
1408 if len(t.conns) < t.maxEstablishedConns {
1411 return t.worstBadConn() != nil
1414 func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
1416 defer t.cl.mu.Unlock()
1417 oldMax = t.maxEstablishedConns
1418 t.maxEstablishedConns = max
1419 wcs := slices.HeapInterface(slices.FromMapKeys(t.conns), worseConn)
1420 for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 {
1421 t.dropConnection(wcs.Pop().(*connection))
1427 func (t *Torrent) mu() missinggo.RWLocker {
1431 func (t *Torrent) pieceHashed(piece int, correct bool) {
1432 if t.closed.IsSet() {
1435 p := &t.pieces[piece]
1436 touchers := t.reapPieceTouchers(piece)
1438 // Don't score the first time a piece is hashed, it could be an
1441 pieceHashedCorrect.Add(1)
1443 log.Printf("%s: piece %d (%s) failed hash: %d connections contributed", t, piece, p.hash, len(touchers))
1444 pieceHashedNotCorrect.Add(1)
1449 for _, c := range touchers {
1450 c.goodPiecesDirtied++
1452 err := p.Storage().MarkComplete()
1454 log.Printf("%T: error completing piece %d: %s", t.storage, piece, err)
1456 t.updatePieceCompletion(piece)
1458 if len(touchers) != 0 {
1459 for _, c := range touchers {
1460 // Y u do dis peer?!
1461 c.badPiecesDirtied++
1463 slices.Sort(touchers, connLessTrusted)
1464 log.Printf("dropping first corresponding conn from trust: %v", func() (ret []int) {
1465 for _, c := range touchers {
1466 ret = append(ret, c.netGoodPiecesDirtied())
1471 t.cl.banPeerIP(missinggo.AddrIP(c.remoteAddr()))
1474 t.onIncompletePiece(piece)
1478 func (t *Torrent) cancelRequestsForPiece(piece int) {
1479 for cn := range t.conns {
1484 func (t *Torrent) onPieceCompleted(piece int) {
1485 t.pendingPieces.Remove(piece)
1486 t.pendAllChunkSpecs(piece)
1487 t.cancelRequestsForPiece(piece)
1488 for conn := range t.conns {
1493 func (t *Torrent) onIncompletePiece(piece int) {
1494 if t.pieceAllDirty(piece) {
1495 t.pendAllChunkSpecs(piece)
1497 if !t.wantPieceIndex(piece) {
1500 // We could drop any connections that we told we have a piece that we
1501 // don't here. But there's a test failure, and it seems clients don't care
1502 // if you request pieces that you already claim to have. Pruning bad
1503 // connections might just remove any connections that aren't treating us
1504 // favourably anyway.
1506 // for c := range t.conns {
1507 // if c.sentHave(piece) {
1511 for conn := range t.conns {
1512 if conn.PeerHasPiece(piece) {
1513 conn.updateRequests()
1518 func (t *Torrent) verifyPiece(piece int) {
1521 defer cl.mu.Unlock()
1522 p := &t.pieces[piece]
1525 cl.event.Broadcast()
1527 for p.hashing || t.storage == nil {
1530 if !p.t.piecesQueuedForHash.Remove(piece) {
1531 panic("piece was not queued")
1533 if t.closed.IsSet() || t.pieceComplete(piece) {
1534 t.updatePiecePriority(piece)
1535 log.Println("early return", t.closed.IsSet(), t.pieceComplete(piece))
1539 t.publishPieceChange(piece)
1541 sum := t.hashPiece(piece)
1544 t.pieceHashed(piece, sum == p.hash)
1547 // Return the connections that touched a piece, and clear the entry while
1549 func (t *Torrent) reapPieceTouchers(piece int) (ret []*connection) {
1550 for c := range t.conns {
1551 if _, ok := c.peerTouchedPieces[piece]; ok {
1552 ret = append(ret, c)
1553 delete(c.peerTouchedPieces, piece)
1559 func (t *Torrent) connsAsSlice() (ret []*connection) {
1560 for c := range t.conns {
1561 ret = append(ret, c)
1566 // Currently doesn't really queue, but should in the future.
1567 func (t *Torrent) queuePieceCheck(pieceIndex int) {
1568 piece := &t.pieces[pieceIndex]
1569 if piece.queuedForHash() {
1572 t.piecesQueuedForHash.Add(pieceIndex)
1573 t.publishPieceChange(pieceIndex)
1574 go t.verifyPiece(pieceIndex)
1577 func (t *Torrent) VerifyData() {
1578 for i := range iter.N(t.NumPieces()) {
1579 t.Piece(i).VerifyData()