16 "github.com/anacrolix/missinggo"
17 "github.com/anacrolix/missinggo/bitmap"
18 "github.com/anacrolix/missinggo/itertools"
19 "github.com/anacrolix/missinggo/perf"
20 "github.com/anacrolix/missinggo/pubsub"
21 "github.com/anacrolix/missinggo/slices"
22 "github.com/bradfitz/iter"
24 "github.com/anacrolix/torrent/bencode"
25 "github.com/anacrolix/torrent/metainfo"
26 pp "github.com/anacrolix/torrent/peer_protocol"
27 "github.com/anacrolix/torrent/storage"
28 "github.com/anacrolix/torrent/tracker"
31 func (t *Torrent) chunkIndexSpec(chunkIndex, piece int) chunkSpec {
32 return chunkIndexSpec(chunkIndex, t.pieceLength(piece), t.chunkSize)
35 type peersKey struct {
40 // Maintains state of torrent within a Client.
44 closed missinggo.Event
45 infoHash metainfo.Hash
47 // Values are the piece indices that changed.
48 pieceStateChanges *pubsub.PubSub
50 // Total length of the torrent in bytes. Stored because it's not O(1) to
51 // get this from the info dict.
54 // The storage to open when the info dict becomes available.
55 storageOpener storage.Client
56 // Storage for torrent data.
57 storage storage.Torrent
59 metainfo metainfo.MetaInfo
61 // The info dict. nil if we don't have it (yet).
63 // Active peer connections, running message stream loops.
65 maxEstablishedConns int
66 // Set of addrs to which we're attempting to connect. Connections are
67 // half-open until all handshakes are completed.
68 halfOpen map[string]struct{}
70 // Reserve of peers to connect to. A peer can be both here and in the
71 // active connections if were told about the peer after connecting with
72 // them. That encourages us to reconnect to peers that are well known.
73 peers map[peersKey]Peer
74 wantPeersEvent missinggo.Event
75 // An announcer for each tracker URL.
76 trackerAnnouncers map[string]*trackerScraper
77 // How many times we've initiated a DHT announce.
80 // Name used if the info name isn't available.
82 // The bencoded bytes of the info dict.
84 // Each element corresponds to the 16KiB metadata pieces. If true, we have
85 // received that piece.
86 metadataCompletedChunks []bool
88 // Set when .Info is obtained.
89 gotMetainfo missinggo.Event
91 readers map[*Reader]struct{}
93 pendingPieces bitmap.Bitmap
94 completedPieces bitmap.Bitmap
96 connPieceInclinationPool sync.Pool
100 func (t *Torrent) setDisplayName(dn string) {
107 func (t *Torrent) pieceComplete(piece int) bool {
108 return t.completedPieces.Get(piece)
111 func (t *Torrent) pieceCompleteUncached(piece int) bool {
112 return t.pieces[piece].Storage().GetIsComplete()
115 func (t *Torrent) numConnsUnchoked() (num int) {
116 for _, c := range t.conns {
124 // There's a connection to that address already.
125 func (t *Torrent) addrActive(addr string) bool {
126 if _, ok := t.halfOpen[addr]; ok {
129 for _, c := range t.conns {
130 if c.remoteAddr().String() == addr {
137 func (t *Torrent) worstUnclosedConns() (ret []*connection) {
138 ret = make([]*connection, 0, len(t.conns))
139 for _, c := range t.conns {
140 if !c.closed.IsSet() {
147 func (t *Torrent) addPeer(p Peer) {
150 if len(t.peers) >= torrentPeersHighWater {
153 key := peersKey{string(p.IP), p.Port}
154 if _, ok := t.peers[key]; ok {
158 peersAddedBySource.Add(string(p.Source), 1)
163 func (t *Torrent) invalidateMetadata() {
164 t.metadataBytes = nil
165 t.metadataCompletedChunks = nil
169 func (t *Torrent) saveMetadataPiece(index int, data []byte) {
173 if index >= len(t.metadataCompletedChunks) {
174 log.Printf("%s: ignoring metadata piece %d", t, index)
177 copy(t.metadataBytes[(1<<14)*index:], data)
178 t.metadataCompletedChunks[index] = true
181 func (t *Torrent) metadataPieceCount() int {
182 return (len(t.metadataBytes) + (1 << 14) - 1) / (1 << 14)
185 func (t *Torrent) haveMetadataPiece(piece int) bool {
187 return (1<<14)*piece < len(t.metadataBytes)
189 return piece < len(t.metadataCompletedChunks) && t.metadataCompletedChunks[piece]
193 func (t *Torrent) metadataSizeKnown() bool {
194 return t.metadataBytes != nil
197 func (t *Torrent) metadataSize() int {
198 return len(t.metadataBytes)
201 func infoPieceHashes(info *metainfo.Info) (ret []string) {
202 for i := 0; i < len(info.Pieces); i += 20 {
203 ret = append(ret, string(info.Pieces[i:i+20]))
208 // Called when metadata for a torrent becomes available.
209 func (t *Torrent) setInfoBytes(b []byte) error {
213 var ie *metainfo.InfoEx
214 err := bencode.Unmarshal(b, &ie)
216 return fmt.Errorf("error unmarshalling info bytes: %s", err)
218 if ie.Hash() != t.infoHash {
219 return errors.New("info bytes have wrong hash")
221 err = validateInfo(&ie.Info)
223 return fmt.Errorf("bad info: %s", err)
225 defer t.updateWantPeersEvent()
227 t.displayName = "" // Save a few bytes lol.
228 t.cl.event.Broadcast()
230 t.storage, err = t.storageOpener.OpenTorrent(t.info)
232 return fmt.Errorf("error opening torrent storage: %s", err)
235 for _, f := range t.info.UpvertedFiles() {
239 t.metadataCompletedChunks = nil
240 hashes := infoPieceHashes(&t.info.Info)
241 t.pieces = make([]piece, len(hashes))
242 for i, hash := range hashes {
243 piece := &t.pieces[i]
246 piece.noPendingWrites.L = &piece.pendingWritesMutex
247 missinggo.CopyExact(piece.Hash[:], hash)
249 for _, conn := range t.conns {
250 if err := conn.setNumPieces(t.numPieces()); err != nil {
251 log.Printf("closing connection: %s", err)
255 for i := range t.pieces {
256 t.updatePieceCompletion(i)
257 t.pieces[i].QueuedForHash = true
260 for i := range t.pieces {
267 func (t *Torrent) verifyPiece(piece int) {
268 t.cl.verifyPiece(t, piece)
271 func (t *Torrent) haveAllMetadataPieces() bool {
275 if t.metadataCompletedChunks == nil {
278 for _, have := range t.metadataCompletedChunks {
286 // TODO: Propagate errors to disconnect peer.
287 func (t *Torrent) setMetadataSize(bytes int64) (err error) {
289 // We already know the correct metadata size.
292 if bytes <= 0 || bytes > 10000000 { // 10MB, pulled from my ass.
293 return errors.New("bad size")
295 if t.metadataBytes != nil && len(t.metadataBytes) == int(bytes) {
298 t.metadataBytes = make([]byte, bytes)
299 t.metadataCompletedChunks = make([]bool, (bytes+(1<<14)-1)/(1<<14))
300 for _, c := range t.conns {
301 c.requestPendingMetadata()
306 // The current working name for the torrent. Either the name in the info dict,
307 // or a display name given such as by the dn value in a magnet link, or "".
308 func (t *Torrent) name() string {
315 func (t *Torrent) pieceState(index int) (ret PieceState) {
316 p := &t.pieces[index]
317 ret.Priority = t.piecePriority(index)
318 if t.pieceComplete(index) {
321 if p.QueuedForHash || p.Hashing {
324 if !ret.Complete && t.piecePartiallyDownloaded(index) {
330 func (t *Torrent) metadataPieceSize(piece int) int {
331 return metadataPieceSize(len(t.metadataBytes), piece)
334 func (t *Torrent) newMetadataExtensionMessage(c *connection, msgType int, piece int, data []byte) pp.Message {
340 d["total_size"] = len(t.metadataBytes)
342 p, err := bencode.Marshal(d)
348 ExtendedID: c.PeerExtensionIDs["ut_metadata"],
349 ExtendedPayload: append(p, data...),
353 func (t *Torrent) pieceStateRuns() (ret []PieceStateRun) {
354 rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
355 ret = append(ret, PieceStateRun{
356 PieceState: el.(PieceState),
360 for index := range t.pieces {
361 rle.Append(t.pieceState(index), 1)
367 // Produces a small string representing a PieceStateRun.
368 func pieceStateRunStatusChars(psr PieceStateRun) (ret string) {
369 ret = fmt.Sprintf("%d", psr.Length)
370 ret += func() string {
371 switch psr.Priority {
372 case PiecePriorityNext:
374 case PiecePriorityNormal:
376 case PiecePriorityReadahead:
378 case PiecePriorityNow:
396 func (t *Torrent) writeStatus(w io.Writer, cl *Client) {
397 fmt.Fprintf(w, "Infohash: %x\n", t.infoHash)
398 fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
400 fmt.Fprintf(w, "Metadata have: ")
401 for _, h := range t.metadataCompletedChunks {
402 fmt.Fprintf(w, "%c", func() rune {
412 fmt.Fprintf(w, "Piece length: %s\n", func() string {
414 return fmt.Sprint(t.usualPieceSize())
420 fmt.Fprintf(w, "Num Pieces: %d\n", t.numPieces())
421 fmt.Fprint(w, "Piece States:")
422 for _, psr := range t.pieceStateRuns() {
424 w.Write([]byte(pieceStateRunStatusChars(psr)))
428 fmt.Fprintf(w, "Reader Pieces:")
429 t.forReaderOffsetPieces(func(begin, end int) (again bool) {
430 fmt.Fprintf(w, " %d:%d", begin, end)
435 fmt.Fprintf(w, "Trackers: ")
436 for _url := range t.trackerAnnouncers {
437 fmt.Fprintf(w, "%q ", _url)
441 fmt.Fprintf(w, "DHT Announces: %d\n", t.numDHTAnnounces)
443 fmt.Fprintf(w, "Pending peers: %d\n", len(t.peers))
444 fmt.Fprintf(w, "Half open: %d\n", len(t.halfOpen))
445 fmt.Fprintf(w, "Active peers: %d\n", len(t.conns))
446 slices.Sort(t.conns, worseConn)
447 for i, c := range t.conns {
448 fmt.Fprintf(w, "%2d. ", i+1)
453 func (t *Torrent) haveInfo() bool {
457 // TODO: Include URIs that weren't converted to tracker clients.
458 func (t *Torrent) announceList() (al [][]string) {
459 return t.metainfo.AnnounceList
462 // Returns a run-time generated MetaInfo that includes the info bytes and
463 // announce-list as currently known to the client.
464 func (t *Torrent) newMetaInfo() (mi *metainfo.MetaInfo) {
465 mi = &metainfo.MetaInfo{
466 CreationDate: time.Now().Unix(),
467 Comment: "dynamic metainfo from client",
468 CreatedBy: "go.torrent",
469 AnnounceList: t.announceList(),
477 func (t *Torrent) bytesLeft() (left int64) {
478 for i := 0; i < t.numPieces(); i++ {
479 left += int64(t.pieces[i].bytesLeft())
484 // Bytes left to give in tracker announces.
485 func (t *Torrent) bytesLeftAnnounce() uint64 {
487 return uint64(t.bytesLeft())
489 return math.MaxUint64
493 func (t *Torrent) piecePartiallyDownloaded(piece int) bool {
494 if t.pieceComplete(piece) {
497 if t.pieceAllDirty(piece) {
500 return t.pieces[piece].hasDirtyChunks()
503 func (t *Torrent) usualPieceSize() int {
504 return int(t.info.PieceLength)
507 func (t *Torrent) lastPieceSize() int {
508 return int(t.pieceLength(t.numPieces() - 1))
511 func (t *Torrent) numPieces() int {
512 return t.info.NumPieces()
515 func (t *Torrent) numPiecesCompleted() (num int) {
516 return t.completedPieces.Len()
519 func (t *Torrent) close() (err error) {
521 if c, ok := t.storage.(io.Closer); ok {
524 for _, conn := range t.conns {
527 t.pieceStateChanges.Close()
528 t.updateWantPeersEvent()
532 func (t *Torrent) requestOffset(r request) int64 {
533 return torrentRequestOffset(t.length, int64(t.usualPieceSize()), r)
536 // Return the request that would include the given offset into the torrent
537 // data. Returns !ok if there is no such request.
538 func (t *Torrent) offsetRequest(off int64) (req request, ok bool) {
539 return torrentOffsetRequest(t.length, t.info.PieceLength, int64(t.chunkSize), off)
542 func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
543 tr := perf.NewTimer()
545 n, err := t.pieces[piece].Storage().WriteAt(data, begin)
546 if err == nil && n != len(data) {
547 err = io.ErrShortWrite
550 tr.Stop("write chunk")
555 func (t *Torrent) bitfield() (bf []bool) {
556 bf = make([]bool, t.numPieces())
557 t.completedPieces.IterTyped(func(piece int) (again bool) {
564 func (t *Torrent) validOutgoingRequest(r request) bool {
565 if r.Index >= pp.Integer(t.info.NumPieces()) {
568 if r.Begin%t.chunkSize != 0 {
571 if r.Length > t.chunkSize {
574 pieceLength := t.pieceLength(int(r.Index))
575 if r.Begin+r.Length > pieceLength {
578 return r.Length == t.chunkSize || r.Begin+r.Length == pieceLength
581 func (t *Torrent) pieceChunks(piece int) (css []chunkSpec) {
582 css = make([]chunkSpec, 0, (t.pieceLength(piece)+t.chunkSize-1)/t.chunkSize)
584 for left := t.pieceLength(piece); left != 0; left -= cs.Length {
586 if cs.Length > t.chunkSize {
587 cs.Length = t.chunkSize
589 css = append(css, cs)
590 cs.Begin += cs.Length
595 func (t *Torrent) pieceNumChunks(piece int) int {
596 return int((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize)
599 func (t *Torrent) pendAllChunkSpecs(pieceIndex int) {
600 t.pieces[pieceIndex].DirtyChunks.Clear()
608 // Peer is known to support encryption.
609 SupportsEncryption bool
612 func (t *Torrent) pieceLength(piece int) (len_ pp.Integer) {
613 if piece < 0 || piece >= t.info.NumPieces() {
616 if piece == t.numPieces()-1 {
617 len_ = pp.Integer(t.length % t.info.PieceLength)
620 len_ = pp.Integer(t.info.PieceLength)
625 func (t *Torrent) hashPiece(piece int) (ret metainfo.Hash) {
626 hash := pieceHash.New()
627 p := &t.pieces[piece]
628 p.waitNoPendingWrites()
629 ip := t.info.Piece(piece)
631 n, err := io.Copy(hash, io.NewSectionReader(t.pieces[piece].Storage(), 0, pl))
633 missinggo.CopyExact(&ret, hash.Sum(nil))
636 if err != io.ErrUnexpectedEOF && !os.IsNotExist(err) {
637 log.Printf("unexpected error hashing piece with %T: %s", t.storage, err)
642 func (t *Torrent) haveAllPieces() bool {
646 return t.completedPieces.Len() == t.numPieces()
649 func (t *Torrent) haveAnyPieces() bool {
650 for i := range t.pieces {
651 if t.pieceComplete(i) {
658 func (t *Torrent) havePiece(index int) bool {
659 return t.haveInfo() && t.pieceComplete(index)
662 func (t *Torrent) haveChunk(r request) (ret bool) {
664 // log.Println("have chunk", r, ret)
669 if t.pieceComplete(int(r.Index)) {
672 p := &t.pieces[r.Index]
673 return !p.pendingChunk(r.chunkSpec, t.chunkSize)
676 func chunkIndex(cs chunkSpec, chunkSize pp.Integer) int {
677 return int(cs.Begin / chunkSize)
680 func (t *Torrent) wantPiece(r request) bool {
681 if !t.wantPieceIndex(int(r.Index)) {
684 if t.pieces[r.Index].pendingChunk(r.chunkSpec, t.chunkSize) {
687 // TODO: What about pieces that were wanted, but aren't now, and aren't
688 // completed either? That used to be done here.
692 func (t *Torrent) wantPieceIndex(index int) bool {
696 p := &t.pieces[index]
703 if t.pieceComplete(index) {
706 if t.pendingPieces.Contains(index) {
709 return !t.forReaderOffsetPieces(func(begin, end int) bool {
710 return index < begin || index >= end
714 func (t *Torrent) forNeededPieces(f func(piece int) (more bool)) (all bool) {
715 return t.forReaderOffsetPieces(func(begin, end int) (more bool) {
716 for i := begin; begin < end; i++ {
725 func (t *Torrent) connHasWantedPieces(c *connection) bool {
726 return !c.pieceRequestOrder.IsEmpty()
729 func (t *Torrent) extentPieces(off, _len int64) (pieces []int) {
730 for i := off / int64(t.usualPieceSize()); i*int64(t.usualPieceSize()) < off+_len; i++ {
731 pieces = append(pieces, int(i))
736 // The worst connection is one that hasn't been sent, or sent anything useful
737 // for the longest. A bad connection is one that usually sends us unwanted
738 // pieces, or has been in worser half of the established connections for more
740 func (t *Torrent) worstBadConn() *connection {
741 wcs := slices.AsHeap(t.worstUnclosedConns(), worseConn)
743 c := heap.Pop(wcs).(*connection)
744 if c.UnwantedChunksReceived >= 6 && c.UnwantedChunksReceived > c.UsefulChunksReceived {
747 if wcs.Len() >= (t.maxEstablishedConns+1)/2 {
748 // Give connections 1 minute to prove themselves.
749 if time.Since(c.completedHandshake) > time.Minute {
757 type PieceStateChange struct {
762 func (t *Torrent) publishPieceChange(piece int) {
763 cur := t.pieceState(piece)
764 p := &t.pieces[piece]
765 if cur != p.PublicPieceState {
766 p.PublicPieceState = cur
767 t.pieceStateChanges.Publish(PieceStateChange{
774 func (t *Torrent) pieceNumPendingChunks(piece int) int {
775 if t.pieceComplete(piece) {
778 return t.pieceNumChunks(piece) - t.pieces[piece].numDirtyChunks()
781 func (t *Torrent) pieceAllDirty(piece int) bool {
782 return t.pieces[piece].DirtyChunks.Len() == t.pieceNumChunks(piece)
785 func (t *Torrent) forUrgentPieces(f func(piece int) (again bool)) (all bool) {
786 return t.forReaderOffsetPieces(func(begin, end int) (again bool) {
796 func (t *Torrent) readersChanged() {
797 t.updatePiecePriorities()
800 func (t *Torrent) maybeNewConns() {
801 // Tickle the accept routine.
802 t.cl.event.Broadcast()
806 func (t *Torrent) piecePriorityChanged(piece int) {
807 for _, c := range t.conns {
808 c.updatePiecePriority(piece)
811 t.publishPieceChange(piece)
814 func (t *Torrent) updatePiecePriority(piece int) bool {
815 p := &t.pieces[piece]
816 newPrio := t.piecePriorityUncached(piece)
817 if newPrio == p.priority {
824 // Update all piece priorities in one hit. This function should have the same
825 // output as updatePiecePriority, but across all pieces.
826 func (t *Torrent) updatePiecePriorities() {
827 newPrios := make([]piecePriority, t.numPieces())
828 t.pendingPieces.IterTyped(func(piece int) (more bool) {
829 newPrios[piece] = PiecePriorityNormal
832 t.forReaderOffsetPieces(func(begin, end int) (next bool) {
834 newPrios[begin].Raise(PiecePriorityNow)
836 for i := begin + 1; i < end; i++ {
837 newPrios[i].Raise(PiecePriorityReadahead)
841 t.completedPieces.IterTyped(func(piece int) (more bool) {
842 newPrios[piece] = PiecePriorityNone
845 for i, prio := range newPrios {
846 if prio != t.pieces[i].priority {
847 t.pieces[i].priority = prio
848 t.piecePriorityChanged(i)
853 func (t *Torrent) byteRegionPieces(off, size int64) (begin, end int) {
864 begin = int(off / t.info.PieceLength)
865 end = int((off + size + t.info.PieceLength - 1) / t.info.PieceLength)
866 if end > t.info.NumPieces() {
867 end = t.info.NumPieces()
872 // Returns true if all iterations complete without breaking.
873 func (t *Torrent) forReaderOffsetPieces(f func(begin, end int) (more bool)) (all bool) {
874 // There's an oppurtunity here to build a map of beginning pieces, and a
875 // bitmap of the rest. I wonder if it's worth the allocation overhead.
876 for r := range t.readers {
878 pos, readahead := r.pos, r.readahead
883 begin, end := t.byteRegionPieces(pos, readahead)
894 func (t *Torrent) piecePriority(piece int) piecePriority {
896 return PiecePriorityNone
898 return t.pieces[piece].priority
901 func (t *Torrent) piecePriorityUncached(piece int) (ret piecePriority) {
902 ret = PiecePriorityNone
903 if t.pieceComplete(piece) {
906 if t.pendingPieces.Contains(piece) {
907 ret = PiecePriorityNormal
909 raiseRet := ret.Raise
910 t.forReaderOffsetPieces(func(begin, end int) (again bool) {
912 raiseRet(PiecePriorityNow)
914 if begin <= piece && piece < end {
915 raiseRet(PiecePriorityReadahead)
922 func (t *Torrent) pendPiece(piece int) {
923 if t.pendingPieces.Contains(piece) {
926 if t.havePiece(piece) {
929 t.pendingPieces.Add(piece)
930 if !t.updatePiecePriority(piece) {
933 t.piecePriorityChanged(piece)
936 func (t *Torrent) getCompletedPieces() (ret bitmap.Bitmap) {
937 return t.completedPieces.Copy()
940 func (t *Torrent) unpendPieces(unpend *bitmap.Bitmap) {
941 t.pendingPieces.Sub(unpend)
942 t.updatePiecePriorities()
945 func (t *Torrent) pendPieceRange(begin, end int) {
946 for i := begin; i < end; i++ {
951 func (t *Torrent) unpendPieceRange(begin, end int) {
953 bm.AddRange(begin, end)
957 func (t *Torrent) connRequestPiecePendingChunks(c *connection, piece int) (more bool) {
958 if !c.PeerHasPiece(piece) {
961 chunkIndices := t.pieces[piece].undirtiedChunkIndices().ToSortedSlice()
962 return itertools.ForPerm(len(chunkIndices), func(i int) bool {
963 req := request{pp.Integer(piece), t.chunkIndexSpec(chunkIndices[i], piece)}
964 return c.Request(req)
968 func (t *Torrent) pendRequest(req request) {
969 ci := chunkIndex(req.chunkSpec, t.chunkSize)
970 t.pieces[req.Index].pendChunkIndex(ci)
973 func (t *Torrent) pieceChanged(piece int) {
974 t.cl.pieceChanged(t, piece)
977 func (t *Torrent) openNewConns() {
981 func (t *Torrent) getConnPieceInclination() []int {
982 _ret := t.connPieceInclinationPool.Get()
984 pieceInclinationsNew.Add(1)
985 return rand.Perm(t.numPieces())
987 pieceInclinationsReused.Add(1)
991 func (t *Torrent) putPieceInclination(pi []int) {
992 t.connPieceInclinationPool.Put(pi)
993 pieceInclinationsPut.Add(1)
996 func (t *Torrent) updatePieceCompletion(piece int) {
997 pcu := t.pieceCompleteUncached(piece)
998 changed := t.completedPieces.Get(piece) != pcu
999 t.completedPieces.Set(piece, pcu)
1001 t.pieceChanged(piece)
1005 // Non-blocking read. Client lock is not required.
1006 func (t *Torrent) readAt(b []byte, off int64) (n int, err error) {
1007 p := &t.pieces[off/t.info.PieceLength]
1008 p.waitNoPendingWrites()
1009 return p.Storage().ReadAt(b, off-p.Info().Offset())
1012 func (t *Torrent) updateAllPieceCompletions() {
1013 for i := range iter.N(t.numPieces()) {
1014 t.updatePieceCompletion(i)
1018 func (t *Torrent) maybeMetadataCompleted() {
1023 if !t.haveAllMetadataPieces() {
1024 // Don't have enough metadata pieces.
1027 // TODO(anacrolix): If this fails, I think something harsher should be
1029 err := t.setInfoBytes(t.metadataBytes)
1031 log.Printf("error setting metadata: %s", err)
1032 t.invalidateMetadata()
1035 if t.cl.config.Debug {
1036 log.Printf("%s: got metadata from peers", t)
1040 func (t *Torrent) readerPieces() (ret bitmap.Bitmap) {
1041 t.forReaderOffsetPieces(func(begin, end int) bool {
1042 ret.AddRange(begin, end)
1048 func (t *Torrent) needData() bool {
1052 if t.pendingPieces.Len() != 0 {
1055 return !t.readerPieces().IterTyped(func(piece int) bool {
1056 return t.pieceComplete(piece)
1060 func appendMissingStrings(old, new []string) (ret []string) {
1063 for _, n := range new {
1064 for _, o := range old {
1069 ret = append(ret, n)
1074 func appendMissingTrackerTiers(existing [][]string, minNumTiers int) (ret [][]string) {
1076 for minNumTiers > len(ret) {
1077 ret = append(ret, nil)
1082 func (t *Torrent) addTrackers(announceList [][]string) {
1083 fullAnnounceList := &t.metainfo.AnnounceList
1084 t.metainfo.AnnounceList = appendMissingTrackerTiers(*fullAnnounceList, len(announceList))
1085 for tierIndex, trackerURLs := range announceList {
1086 (*fullAnnounceList)[tierIndex] = appendMissingStrings((*fullAnnounceList)[tierIndex], trackerURLs)
1088 t.startMissingTrackerScrapers()
1089 t.updateWantPeersEvent()
1092 // Don't call this before the info is available.
1093 func (t *Torrent) bytesCompleted() int64 {
1097 return t.info.TotalLength() - t.bytesLeft()
1100 func (t *Torrent) SetInfoBytes(b []byte) (err error) {
1102 defer t.cl.mu.Unlock()
1103 return t.setInfoBytes(b)
1106 // Returns true if connection is removed from torrent.Conns.
1107 func (t *Torrent) deleteConnection(c *connection) bool {
1108 for i0, _c := range t.conns {
1112 i1 := len(t.conns) - 1
1114 t.conns[i0] = t.conns[i1]
1116 t.conns = t.conns[:i1]
1122 func (t *Torrent) dropConnection(c *connection) {
1123 t.cl.event.Broadcast()
1125 if t.deleteConnection(c) {
1130 func (t *Torrent) wantPeers() bool {
1131 if t.closed.IsSet() {
1134 if len(t.peers) > torrentPeersLowWater {
1137 return t.needData() || t.seeding()
1140 func (t *Torrent) updateWantPeersEvent() {
1142 t.wantPeersEvent.Set()
1144 t.wantPeersEvent.Clear()
1148 // Returns whether the client should make effort to seed the torrent.
1149 func (t *Torrent) seeding() bool {
1151 if cl.config.NoUpload {
1154 if !cl.config.Seed {
1163 // Adds and starts tracker scrapers for tracker URLs that aren't already
1165 func (t *Torrent) startMissingTrackerScrapers() {
1166 if t.cl.config.DisableTrackers {
1169 for _, tier := range t.announceList() {
1170 for _, trackerURL := range tier {
1171 if _, ok := t.trackerAnnouncers[trackerURL]; ok {
1174 newAnnouncer := &trackerScraper{
1178 if t.trackerAnnouncers == nil {
1179 t.trackerAnnouncers = make(map[string]*trackerScraper)
1181 t.trackerAnnouncers[trackerURL] = newAnnouncer
1182 go newAnnouncer.Run()
1187 // Returns an AnnounceRequest with fields filled out to defaults and current
1189 func (t *Torrent) announceRequest() tracker.AnnounceRequest {
1190 return tracker.AnnounceRequest{
1191 Event: tracker.None,
1193 Port: uint16(t.cl.incomingPeerPort()),
1194 PeerId: t.cl.peerID,
1195 InfoHash: t.infoHash,
1196 Left: t.bytesLeftAnnounce(),
1200 func (t *Torrent) announceDHT(impliedPort bool) {
1204 case <-t.wantPeersEvent.LockedChan(&cl.mu):
1205 case <-t.closed.LockedChan(&cl.mu):
1208 // log.Printf("getting peers for %q from DHT", t)
1209 ps, err := cl.dHT.Announce(string(t.infoHash[:]), cl.incomingPeerPort(), impliedPort)
1211 log.Printf("error getting peers from dht: %s", err)
1217 // Count all the unique addresses we got during this announce.
1218 allAddrs := make(map[string]struct{})
1222 case v, ok := <-ps.Peers:
1226 addPeers := make([]Peer, 0, len(v.Peers))
1227 for _, cp := range v.Peers {
1229 // Can't do anything with this.
1232 addPeers = append(addPeers, Peer{
1235 Source: peerSourceDHT,
1237 key := (&net.UDPAddr{
1241 allAddrs[key] = struct{}{}
1244 t.addPeers(addPeers)
1245 numPeers := len(t.peers)
1247 if numPeers >= torrentPeersHighWater {
1250 case <-t.closed.LockedChan(&cl.mu):
1256 // log.Printf("finished DHT peer scrape for %s: %d peers", t, len(allAddrs))
1260 func (t *Torrent) addPeers(peers []Peer) {
1261 for _, p := range peers {
1262 if t.cl.badPeerIPPort(p.IP, p.Port) {
1269 func (t *Torrent) Stats() TorrentStats {
1273 // Returns true if the connection is added.
1274 func (t *Torrent) addConnection(c *connection) bool {
1275 if t.cl.closed.IsSet() {
1281 for _, c0 := range t.conns {
1282 if c.PeerID == c0.PeerID {
1283 // Already connected to a client with that ID.
1284 duplicateClientConns.Add(1)
1288 if len(t.conns) >= t.maxEstablishedConns {
1289 c := t.worstBadConn()
1293 if t.cl.config.Debug && missinggo.CryHeard() {
1294 log.Printf("%s: dropping connection to make room for new one:\n %s", t, c)
1297 t.deleteConnection(c)
1299 if len(t.conns) >= t.maxEstablishedConns {
1302 t.conns = append(t.conns, c)
1307 func (t *Torrent) wantConns() bool {
1308 if !t.seeding() && !t.needData() {
1311 if len(t.conns) < t.maxEstablishedConns {
1314 return t.worstBadConn() != nil
1317 func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
1319 defer t.cl.mu.Unlock()
1320 oldMax = t.maxEstablishedConns
1321 t.maxEstablishedConns = max
1322 wcs := slices.AsHeap(append([]*connection(nil), t.conns...), worseConn)
1323 for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 {
1324 t.dropConnection(wcs.Pop().(*connection))