19 "github.com/anacrolix/dht"
20 "github.com/anacrolix/missinggo"
21 "github.com/anacrolix/missinggo/bitmap"
22 "github.com/anacrolix/missinggo/perf"
23 "github.com/anacrolix/missinggo/pubsub"
24 "github.com/anacrolix/missinggo/slices"
25 "github.com/bradfitz/iter"
27 "github.com/anacrolix/torrent/bencode"
28 "github.com/anacrolix/torrent/metainfo"
29 pp "github.com/anacrolix/torrent/peer_protocol"
30 "github.com/anacrolix/torrent/storage"
31 "github.com/anacrolix/torrent/tracker"
34 func (t *Torrent) chunkIndexSpec(chunkIndex, piece int) chunkSpec {
35 return chunkIndexSpec(chunkIndex, t.pieceLength(piece), t.chunkSize)
38 type peersKey struct {
43 // Maintains state of torrent within a Client.
47 networkingEnabled bool
50 closed missinggo.Event
51 infoHash metainfo.Hash
53 // Values are the piece indices that changed.
54 pieceStateChanges *pubsub.PubSub
55 // The size of chunks to request from peers over the wire. This is
56 // normally 16KiB by convention these days.
59 // Total length of the torrent in bytes. Stored because it's not O(1) to
60 // get this from the info dict.
63 // The storage to open when the info dict becomes available.
64 storageOpener *storage.Client
65 // Storage for torrent data.
66 storage *storage.Torrent
67 // Read-locked for using storage, and write-locked for Closing.
68 storageLock sync.RWMutex
70 metainfo metainfo.MetaInfo
72 // The info dict. nil if we don't have it (yet).
75 // Active peer connections, running message stream loops.
76 conns map[*connection]struct{}
77 maxEstablishedConns int
78 // Set of addrs to which we're attempting to connect. Connections are
79 // half-open until all handshakes are completed.
80 halfOpen map[string]Peer
81 fastestConn *connection
83 // Reserve of peers to connect to. A peer can be both here and in the
84 // active connections if were told about the peer after connecting with
85 // them. That encourages us to reconnect to peers that are well known in
87 peers map[peersKey]Peer
88 wantPeersEvent missinggo.Event
89 // An announcer for each tracker URL.
90 trackerAnnouncers map[string]*trackerScraper
91 // How many times we've initiated a DHT announce. TODO: Move into stats.
94 // Name used if the info name isn't available. Should be cleared when the
95 // Info does become available.
98 // The bencoded bytes of the info dict. This is actively manipulated if
99 // the info bytes aren't initially available, and we try to fetch them
102 // Each element corresponds to the 16KiB metadata pieces. If true, we have
103 // received that piece.
104 metadataCompletedChunks []bool
105 metadataChanged sync.Cond
107 // Set when .Info is obtained.
108 gotMetainfo missinggo.Event
110 readers map[*reader]struct{}
111 readerNowPieces bitmap.Bitmap
112 readerReadaheadPieces bitmap.Bitmap
114 // The indexes of pieces we want with normal priority, that aren't
115 // currently available.
116 pendingPieces bitmap.Bitmap
117 // A cache of completed piece indices.
118 completedPieces bitmap.Bitmap
119 // Pieces that need to be hashed.
120 piecesQueuedForHash bitmap.Bitmap
122 // A pool of piece priorities []int for assignment to new connections.
123 // These "inclinations" are used to give connections preference for
125 connPieceInclinationPool sync.Pool
126 // Torrent-level statistics.
130 // Returns a channel that is closed when the Torrent is closed.
131 func (t *Torrent) Closed() <-chan struct{} {
132 return t.closed.LockedChan(&t.cl.mu)
135 // KnownSwarm returns the known subset of the peers in the Torrent's swarm, including active,
136 // pending, and half-open peers.
137 func (t *Torrent) KnownSwarm() (ks []Peer) {
138 // Add pending peers to the list
139 for _, peer := range t.peers {
140 ks = append(ks, peer)
143 // Add half-open peers to the list
144 for _, peer := range t.halfOpen {
145 ks = append(ks, peer)
148 // Add active peers to the list
149 for conn := range t.conns {
150 host, portString, err := net.SplitHostPort(conn.remoteAddr().String())
155 ip := net.ParseIP(host)
156 port, err := strconv.Atoi(portString)
161 ks = append(ks, Peer{
165 Source: conn.Discovery,
166 // > If the connection is encrypted, that's certainly enough to set SupportsEncryption.
167 // > But if we're not connected to them with an encrypted connection, I couldn't say
168 // > what's appropriate. We can carry forward the SupportsEncryption value as we
169 // > received it from trackers/DHT/PEX, or just use the encryption state for the
170 // > connection. It's probably easiest to do the latter for now.
171 // https://github.com/anacrolix/torrent/pull/188
172 SupportsEncryption: conn.headerEncrypted,
179 func (t *Torrent) setChunkSize(size pp.Integer) {
181 t.chunkPool = &sync.Pool{
182 New: func() interface{} {
183 b := make([]byte, size)
189 func (t *Torrent) setDisplayName(dn string) {
196 func (t *Torrent) pieceComplete(piece int) bool {
197 return t.completedPieces.Get(piece)
200 func (t *Torrent) pieceCompleteUncached(piece int) storage.Completion {
201 return t.pieces[piece].Storage().Completion()
204 // There's a connection to that address already.
205 func (t *Torrent) addrActive(addr string) bool {
206 if _, ok := t.halfOpen[addr]; ok {
209 for c := range t.conns {
214 if ra.String() == addr {
221 func (t *Torrent) unclosedConnsAsSlice() (ret []*connection) {
222 ret = make([]*connection, 0, len(t.conns))
223 for c := range t.conns {
224 if !c.closed.IsSet() {
231 func (t *Torrent) addPeer(p Peer) {
234 if len(t.peers) >= cl.config.TorrentPeersHighWater {
237 key := peersKey{string(p.IP), p.Port}
238 if _, ok := t.peers[key]; ok {
242 peersAddedBySource.Add(string(p.Source), 1)
247 func (t *Torrent) invalidateMetadata() {
248 for i := range t.metadataCompletedChunks {
249 t.metadataCompletedChunks[i] = false
254 func (t *Torrent) saveMetadataPiece(index int, data []byte) {
258 if index >= len(t.metadataCompletedChunks) {
259 log.Printf("%s: ignoring metadata piece %d", t, index)
262 copy(t.metadataBytes[(1<<14)*index:], data)
263 t.metadataCompletedChunks[index] = true
266 func (t *Torrent) metadataPieceCount() int {
267 return (len(t.metadataBytes) + (1 << 14) - 1) / (1 << 14)
270 func (t *Torrent) haveMetadataPiece(piece int) bool {
272 return (1<<14)*piece < len(t.metadataBytes)
274 return piece < len(t.metadataCompletedChunks) && t.metadataCompletedChunks[piece]
278 func (t *Torrent) metadataSizeKnown() bool {
279 return t.metadataBytes != nil
282 func (t *Torrent) metadataSize() int {
283 return len(t.metadataBytes)
286 func infoPieceHashes(info *metainfo.Info) (ret []string) {
287 for i := 0; i < len(info.Pieces); i += sha1.Size {
288 ret = append(ret, string(info.Pieces[i:i+sha1.Size]))
293 func (t *Torrent) makePieces() {
294 hashes := infoPieceHashes(t.info)
295 t.pieces = make([]Piece, len(hashes))
296 for i, hash := range hashes {
297 piece := &t.pieces[i]
300 piece.noPendingWrites.L = &piece.pendingWritesMutex
301 missinggo.CopyExact(piece.hash[:], hash)
305 // Called when metadata for a torrent becomes available.
306 func (t *Torrent) setInfoBytes(b []byte) error {
310 if metainfo.HashBytes(b) != t.infoHash {
311 return errors.New("info bytes have wrong hash")
313 var info metainfo.Info
314 err := bencode.Unmarshal(b, &info)
316 return fmt.Errorf("error unmarshalling info bytes: %s", err)
318 err = validateInfo(&info)
320 return fmt.Errorf("bad info: %s", err)
322 defer t.updateWantPeersEvent()
324 t.displayName = "" // Save a few bytes lol.
325 t.cl.event.Broadcast()
327 t.storage, err = t.storageOpener.OpenTorrent(t.info, t.infoHash)
329 return fmt.Errorf("error opening torrent storage: %s", err)
332 for _, f := range t.info.UpvertedFiles() {
337 t.metadataCompletedChunks = nil
339 for conn := range t.conns {
340 if err := conn.setNumPieces(t.numPieces()); err != nil {
341 log.Printf("closing connection: %s", err)
345 for i := range t.pieces {
346 t.updatePieceCompletion(i)
348 if !p.storageCompletionOk {
349 // log.Printf("piece %s completion unknown, queueing check", p)
356 func (t *Torrent) haveAllMetadataPieces() bool {
360 if t.metadataCompletedChunks == nil {
363 for _, have := range t.metadataCompletedChunks {
371 // TODO: Propagate errors to disconnect peer.
372 func (t *Torrent) setMetadataSize(bytes int64) (err error) {
374 // We already know the correct metadata size.
377 if bytes <= 0 || bytes > 10000000 { // 10MB, pulled from my ass.
378 return errors.New("bad size")
380 if t.metadataBytes != nil && len(t.metadataBytes) == int(bytes) {
383 t.metadataBytes = make([]byte, bytes)
384 t.metadataCompletedChunks = make([]bool, (bytes+(1<<14)-1)/(1<<14))
385 t.metadataChanged.Broadcast()
386 for c := range t.conns {
387 c.requestPendingMetadata()
392 // The current working name for the torrent. Either the name in the info dict,
393 // or a display name given such as by the dn value in a magnet link, or "".
394 func (t *Torrent) name() string {
401 func (t *Torrent) pieceState(index int) (ret PieceState) {
402 p := &t.pieces[index]
403 ret.Priority = t.piecePriority(index)
404 if t.pieceComplete(index) {
407 if p.queuedForHash() || p.hashing {
410 if !ret.Complete && t.piecePartiallyDownloaded(index) {
416 func (t *Torrent) metadataPieceSize(piece int) int {
417 return metadataPieceSize(len(t.metadataBytes), piece)
420 func (t *Torrent) newMetadataExtensionMessage(c *connection, msgType int, piece int, data []byte) pp.Message {
426 d["total_size"] = len(t.metadataBytes)
428 p, err := bencode.Marshal(d)
434 ExtendedID: c.PeerExtensionIDs["ut_metadata"],
435 ExtendedPayload: append(p, data...),
439 func (t *Torrent) pieceStateRuns() (ret []PieceStateRun) {
440 rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
441 ret = append(ret, PieceStateRun{
442 PieceState: el.(PieceState),
446 for index := range t.pieces {
447 rle.Append(t.pieceState(index), 1)
453 // Produces a small string representing a PieceStateRun.
454 func pieceStateRunStatusChars(psr PieceStateRun) (ret string) {
455 ret = fmt.Sprintf("%d", psr.Length)
456 ret += func() string {
457 switch psr.Priority {
458 case PiecePriorityNext:
460 case PiecePriorityNormal:
462 case PiecePriorityReadahead:
464 case PiecePriorityNow:
482 func (t *Torrent) writeStatus(w io.Writer) {
483 fmt.Fprintf(w, "Infohash: %s\n", t.infoHash.HexString())
484 fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
486 fmt.Fprintf(w, "Metadata have: ")
487 for _, h := range t.metadataCompletedChunks {
488 fmt.Fprintf(w, "%c", func() rune {
498 fmt.Fprintf(w, "Piece length: %s\n", func() string {
500 return fmt.Sprint(t.usualPieceSize())
506 fmt.Fprintf(w, "Num Pieces: %d\n", t.numPieces())
507 fmt.Fprint(w, "Piece States:")
508 for _, psr := range t.pieceStateRuns() {
510 w.Write([]byte(pieceStateRunStatusChars(psr)))
514 fmt.Fprintf(w, "Reader Pieces:")
515 t.forReaderOffsetPieces(func(begin, end int) (again bool) {
516 fmt.Fprintf(w, " %d:%d", begin, end)
521 fmt.Fprintf(w, "Trackers:\n")
523 tw := tabwriter.NewWriter(w, 0, 0, 2, ' ', 0)
524 fmt.Fprintf(tw, " URL\tNext announce\tLast announce\n")
525 for _, ta := range slices.Sort(slices.FromMapElems(t.trackerAnnouncers), func(l, r *trackerScraper) bool {
527 }).([]*trackerScraper) {
528 fmt.Fprintf(tw, " %s\n", ta.statusLine())
533 fmt.Fprintf(w, "DHT Announces: %d\n", t.numDHTAnnounces)
535 fmt.Fprintf(w, "Pending peers: %d\n", len(t.peers))
536 fmt.Fprintf(w, "Half open: %d\n", len(t.halfOpen))
537 fmt.Fprintf(w, "Active peers: %d\n", len(t.conns))
538 conns := t.connsAsSlice()
539 slices.Sort(conns, worseConn)
540 for i, c := range conns {
541 fmt.Fprintf(w, "%2d. ", i+1)
546 func (t *Torrent) haveInfo() bool {
550 // Returns a run-time generated MetaInfo that includes the info bytes and
551 // announce-list as currently known to the client.
552 func (t *Torrent) newMetaInfo() metainfo.MetaInfo {
553 return metainfo.MetaInfo{
554 CreationDate: time.Now().Unix(),
555 Comment: "dynamic metainfo from client",
556 CreatedBy: "go.torrent",
557 AnnounceList: t.metainfo.UpvertedAnnounceList(),
558 InfoBytes: func() []byte {
560 return t.metadataBytes
568 func (t *Torrent) BytesMissing() int64 {
570 defer t.mu().RUnlock()
571 return t.bytesMissingLocked()
574 func (t *Torrent) bytesMissingLocked() int64 {
578 func (t *Torrent) bytesLeft() (left int64) {
579 bitmap.Flip(t.completedPieces, 0, t.numPieces()).IterTyped(func(piece int) bool {
580 p := &t.pieces[piece]
581 left += int64(p.length() - p.numDirtyBytes())
587 // Bytes left to give in tracker announces.
588 func (t *Torrent) bytesLeftAnnounce() uint64 {
590 return uint64(t.bytesLeft())
592 return math.MaxUint64
596 func (t *Torrent) piecePartiallyDownloaded(piece int) bool {
597 if t.pieceComplete(piece) {
600 if t.pieceAllDirty(piece) {
603 return t.pieces[piece].hasDirtyChunks()
606 func (t *Torrent) usualPieceSize() int {
607 return int(t.info.PieceLength)
610 func (t *Torrent) numPieces() int {
611 return t.info.NumPieces()
614 func (t *Torrent) numPiecesCompleted() (num int) {
615 return t.completedPieces.Len()
618 func (t *Torrent) close() (err error) {
620 if t.storage != nil {
623 t.storageLock.Unlock()
625 for conn := range t.conns {
628 t.cl.event.Broadcast()
629 t.pieceStateChanges.Close()
630 t.updateWantPeersEvent()
634 func (t *Torrent) requestOffset(r request) int64 {
635 return torrentRequestOffset(*t.length, int64(t.usualPieceSize()), r)
638 // Return the request that would include the given offset into the torrent
639 // data. Returns !ok if there is no such request.
640 func (t *Torrent) offsetRequest(off int64) (req request, ok bool) {
641 return torrentOffsetRequest(*t.length, t.info.PieceLength, int64(t.chunkSize), off)
644 func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
645 tr := perf.NewTimer()
647 n, err := t.pieces[piece].Storage().WriteAt(data, begin)
648 if err == nil && n != len(data) {
649 err = io.ErrShortWrite
652 tr.Mark("write chunk")
657 func (t *Torrent) bitfield() (bf []bool) {
658 bf = make([]bool, t.numPieces())
659 t.completedPieces.IterTyped(func(piece int) (again bool) {
666 func (t *Torrent) pieceNumChunks(piece int) int {
667 return int((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize)
670 func (t *Torrent) pendAllChunkSpecs(pieceIndex int) {
671 t.pieces[pieceIndex].dirtyChunks.Clear()
679 // Peer is known to support encryption.
680 SupportsEncryption bool
683 func (t *Torrent) pieceLength(piece int) pp.Integer {
684 if piece == t.numPieces()-1 {
685 ret := pp.Integer(*t.length % t.info.PieceLength)
690 return pp.Integer(t.info.PieceLength)
693 func (t *Torrent) hashPiece(piece int) (ret metainfo.Hash) {
694 hash := pieceHash.New()
695 p := &t.pieces[piece]
696 p.waitNoPendingWrites()
697 ip := t.info.Piece(piece)
699 n, err := io.Copy(hash, io.NewSectionReader(t.pieces[piece].Storage(), 0, pl))
701 missinggo.CopyExact(&ret, hash.Sum(nil))
704 if err != io.ErrUnexpectedEOF && !os.IsNotExist(err) {
705 log.Printf("unexpected error hashing piece with %T: %s", t.storage.TorrentImpl, err)
710 func (t *Torrent) haveAnyPieces() bool {
711 for i := range t.pieces {
712 if t.pieceComplete(i) {
719 func (t *Torrent) havePiece(index int) bool {
720 return t.haveInfo() && t.pieceComplete(index)
723 func (t *Torrent) haveChunk(r request) (ret bool) {
725 // log.Println("have chunk", r, ret)
730 if t.pieceComplete(int(r.Index)) {
733 p := &t.pieces[r.Index]
734 return !p.pendingChunk(r.chunkSpec, t.chunkSize)
737 func chunkIndex(cs chunkSpec, chunkSize pp.Integer) int {
738 return int(cs.Begin / chunkSize)
741 func (t *Torrent) wantPiece(r request) bool {
742 if !t.wantPieceIndex(int(r.Index)) {
745 if t.pieces[r.Index].pendingChunk(r.chunkSpec, t.chunkSize) {
748 // TODO: What about pieces that were wanted, but aren't now, and aren't
749 // completed either? That used to be done here.
753 func (t *Torrent) wantPieceIndex(index int) bool {
757 if index < 0 || index >= t.numPieces() {
760 p := &t.pieces[index]
761 if p.queuedForHash() {
767 if t.pieceComplete(index) {
770 if t.pendingPieces.Contains(index) {
773 return !t.forReaderOffsetPieces(func(begin, end int) bool {
774 return index < begin || index >= end
778 // The worst connection is one that hasn't been sent, or sent anything useful
779 // for the longest. A bad connection is one that usually sends us unwanted
780 // pieces, or has been in worser half of the established connections for more
782 func (t *Torrent) worstBadConn() *connection {
783 wcs := worseConnSlice{t.unclosedConnsAsSlice()}
786 c := heap.Pop(&wcs).(*connection)
787 if c.UnwantedChunksReceived >= 6 && c.UnwantedChunksReceived > c.UsefulChunksReceived {
790 if wcs.Len() >= (t.maxEstablishedConns+1)/2 {
791 // Give connections 1 minute to prove themselves.
792 if time.Since(c.completedHandshake) > time.Minute {
800 type PieceStateChange struct {
805 func (t *Torrent) publishPieceChange(piece int) {
806 cur := t.pieceState(piece)
807 p := &t.pieces[piece]
808 if cur != p.publicPieceState {
809 p.publicPieceState = cur
810 t.pieceStateChanges.Publish(PieceStateChange{
817 func (t *Torrent) pieceNumPendingChunks(piece int) int {
818 if t.pieceComplete(piece) {
821 return t.pieceNumChunks(piece) - t.pieces[piece].numDirtyChunks()
824 func (t *Torrent) pieceAllDirty(piece int) bool {
825 return t.pieces[piece].dirtyChunks.Len() == t.pieceNumChunks(piece)
828 func (t *Torrent) readersChanged() {
829 t.updateReaderPieces()
830 t.updateAllPiecePriorities()
833 func (t *Torrent) updateReaderPieces() {
834 t.readerNowPieces, t.readerReadaheadPieces = t.readerPiecePriorities()
837 func (t *Torrent) readerPosChanged(from, to pieceRange) {
841 t.updateReaderPieces()
842 // Order the ranges, high and low.
844 if l.begin > h.begin {
848 // Two distinct ranges.
849 t.updatePiecePriorities(l.begin, l.end)
850 t.updatePiecePriorities(h.begin, h.end)
857 t.updatePiecePriorities(l.begin, end)
861 func (t *Torrent) maybeNewConns() {
862 // Tickle the accept routine.
863 t.cl.event.Broadcast()
867 func (t *Torrent) piecePriorityChanged(piece int) {
868 for c := range t.conns {
869 if c.updatePiecePriority(piece) {
874 t.publishPieceChange(piece)
877 func (t *Torrent) updatePiecePriority(piece int) {
878 p := &t.pieces[piece]
879 newPrio := t.piecePriorityUncached(piece)
880 if newPrio == p.priority {
884 t.piecePriorityChanged(piece)
887 func (t *Torrent) updateAllPiecePriorities() {
888 t.updatePiecePriorities(0, len(t.pieces))
891 // Update all piece priorities in one hit. This function should have the same
892 // output as updatePiecePriority, but across all pieces.
893 func (t *Torrent) updatePiecePriorities(begin, end int) {
894 for i := begin; i < end; i++ {
895 t.updatePiecePriority(i)
899 // Returns the range of pieces [begin, end) that contains the extent of bytes.
900 func (t *Torrent) byteRegionPieces(off, size int64) (begin, end int) {
901 if off >= *t.length {
911 begin = int(off / t.info.PieceLength)
912 end = int((off + size + t.info.PieceLength - 1) / t.info.PieceLength)
913 if end > t.info.NumPieces() {
914 end = t.info.NumPieces()
919 // Returns true if all iterations complete without breaking. Returns the read
920 // regions for all readers. The reader regions should not be merged as some
921 // callers depend on this method to enumerate readers.
922 func (t *Torrent) forReaderOffsetPieces(f func(begin, end int) (more bool)) (all bool) {
923 for r := range t.readers {
925 if p.begin >= p.end {
928 if !f(p.begin, p.end) {
935 func (t *Torrent) piecePriority(piece int) piecePriority {
937 return PiecePriorityNone
939 return t.pieces[piece].priority
942 func (t *Torrent) piecePriorityUncached(piece int) piecePriority {
943 if t.pieceComplete(piece) {
944 return PiecePriorityNone
946 if t.readerNowPieces.Contains(piece) {
947 return PiecePriorityNow
949 // if t.readerNowPieces.Contains(piece - 1) {
950 // return PiecePriorityNext
952 if t.readerReadaheadPieces.Contains(piece) {
953 return PiecePriorityReadahead
955 if t.pendingPieces.Contains(piece) {
956 return PiecePriorityNormal
958 return PiecePriorityNone
961 func (t *Torrent) pendPiece(piece int) {
962 if t.pendingPieces.Contains(piece) {
965 if t.havePiece(piece) {
968 t.pendingPieces.Add(piece)
969 t.updatePiecePriority(piece)
972 func (t *Torrent) unpendPieces(unpend bitmap.Bitmap) {
973 t.pendingPieces.Sub(unpend)
974 unpend.IterTyped(func(piece int) (again bool) {
975 t.updatePiecePriority(piece)
980 func (t *Torrent) pendPieceRange(begin, end int) {
981 for i := begin; i < end; i++ {
986 func (t *Torrent) unpendPieceRange(begin, end int) {
988 bm.AddRange(begin, end)
992 func (t *Torrent) pendRequest(req request) {
993 ci := chunkIndex(req.chunkSpec, t.chunkSize)
994 t.pieces[req.Index].pendChunkIndex(ci)
997 func (t *Torrent) pieceCompletionChanged(piece int) {
998 t.cl.event.Broadcast()
999 if t.pieceComplete(piece) {
1000 t.onPieceCompleted(piece)
1002 t.onIncompletePiece(piece)
1004 t.updatePiecePriority(piece)
1007 func (t *Torrent) openNewConns() {
1008 t.cl.openNewConns(t)
1011 func (t *Torrent) getConnPieceInclination() []int {
1012 _ret := t.connPieceInclinationPool.Get()
1014 pieceInclinationsNew.Add(1)
1015 return rand.Perm(t.numPieces())
1017 pieceInclinationsReused.Add(1)
1018 return *_ret.(*[]int)
1021 func (t *Torrent) putPieceInclination(pi []int) {
1022 t.connPieceInclinationPool.Put(&pi)
1023 pieceInclinationsPut.Add(1)
1026 func (t *Torrent) updatePieceCompletion(piece int) {
1027 pcu := t.pieceCompleteUncached(piece)
1028 p := &t.pieces[piece]
1029 changed := t.completedPieces.Get(piece) != pcu.Complete || p.storageCompletionOk != pcu.Ok
1030 p.storageCompletionOk = pcu.Ok
1031 t.completedPieces.Set(piece, pcu.Complete)
1033 t.pieceCompletionChanged(piece)
1037 // Non-blocking read. Client lock is not required.
1038 func (t *Torrent) readAt(b []byte, off int64) (n int, err error) {
1039 p := &t.pieces[off/t.info.PieceLength]
1040 p.waitNoPendingWrites()
1041 return p.Storage().ReadAt(b, off-p.Info().Offset())
1044 func (t *Torrent) updateAllPieceCompletions() {
1045 for i := range iter.N(t.numPieces()) {
1046 t.updatePieceCompletion(i)
1050 // Returns an error if the metadata was completed, but couldn't be set for
1051 // some reason. Blame it on the last peer to contribute.
1052 func (t *Torrent) maybeCompleteMetadata() error {
1057 if !t.haveAllMetadataPieces() {
1058 // Don't have enough metadata pieces.
1061 err := t.setInfoBytes(t.metadataBytes)
1063 t.invalidateMetadata()
1064 return fmt.Errorf("error setting info bytes: %s", err)
1066 if t.cl.config.Debug {
1067 log.Printf("%s: got metadata from peers", t)
1072 func (t *Torrent) readerPieces() (ret bitmap.Bitmap) {
1073 t.forReaderOffsetPieces(func(begin, end int) bool {
1074 ret.AddRange(begin, end)
1080 func (t *Torrent) readerPiecePriorities() (now, readahead bitmap.Bitmap) {
1081 t.forReaderOffsetPieces(func(begin, end int) bool {
1084 readahead.AddRange(begin+1, end)
1091 func (t *Torrent) needData() bool {
1092 if t.closed.IsSet() {
1098 if t.pendingPieces.Len() != 0 {
1101 // Read as "not all complete".
1102 return !t.readerPieces().IterTyped(func(piece int) bool {
1103 return t.pieceComplete(piece)
1107 func appendMissingStrings(old, new []string) (ret []string) {
1110 for _, n := range new {
1111 for _, o := range old {
1116 ret = append(ret, n)
1121 func appendMissingTrackerTiers(existing [][]string, minNumTiers int) (ret [][]string) {
1123 for minNumTiers > len(ret) {
1124 ret = append(ret, nil)
1129 func (t *Torrent) addTrackers(announceList [][]string) {
1130 fullAnnounceList := &t.metainfo.AnnounceList
1131 t.metainfo.AnnounceList = appendMissingTrackerTiers(*fullAnnounceList, len(announceList))
1132 for tierIndex, trackerURLs := range announceList {
1133 (*fullAnnounceList)[tierIndex] = appendMissingStrings((*fullAnnounceList)[tierIndex], trackerURLs)
1135 t.startMissingTrackerScrapers()
1136 t.updateWantPeersEvent()
1139 // Don't call this before the info is available.
1140 func (t *Torrent) bytesCompleted() int64 {
1144 return t.info.TotalLength() - t.bytesLeft()
1147 func (t *Torrent) SetInfoBytes(b []byte) (err error) {
1149 defer t.cl.mu.Unlock()
1150 return t.setInfoBytes(b)
1153 // Returns true if connection is removed from torrent.Conns.
1154 func (t *Torrent) deleteConnection(c *connection) (ret bool) {
1160 func (t *Torrent) dropConnection(c *connection) {
1161 t.cl.event.Broadcast()
1163 if t.deleteConnection(c) {
1168 func (t *Torrent) wantPeers() bool {
1169 if t.closed.IsSet() {
1172 if len(t.peers) > t.cl.config.TorrentPeersLowWater {
1175 return t.needData() || t.seeding()
1178 func (t *Torrent) updateWantPeersEvent() {
1180 t.wantPeersEvent.Set()
1182 t.wantPeersEvent.Clear()
1186 // Returns whether the client should make effort to seed the torrent.
1187 func (t *Torrent) seeding() bool {
1189 if t.closed.IsSet() {
1192 if cl.config.NoUpload {
1195 if !cl.config.Seed {
1198 if cl.config.DisableAggressiveUpload && t.needData() {
1204 func (t *Torrent) startScrapingTracker(url string) {
1208 if _, ok := t.trackerAnnouncers[url]; ok {
1211 newAnnouncer := &trackerScraper{
1215 if t.trackerAnnouncers == nil {
1216 t.trackerAnnouncers = make(map[string]*trackerScraper)
1218 t.trackerAnnouncers[url] = newAnnouncer
1219 go newAnnouncer.Run()
1222 // Adds and starts tracker scrapers for tracker URLs that aren't already
1224 func (t *Torrent) startMissingTrackerScrapers() {
1225 if t.cl.config.DisableTrackers {
1228 t.startScrapingTracker(t.metainfo.Announce)
1229 for _, tier := range t.metainfo.AnnounceList {
1230 for _, url := range tier {
1231 t.startScrapingTracker(url)
1236 // Returns an AnnounceRequest with fields filled out to defaults and current
1238 func (t *Torrent) announceRequest() tracker.AnnounceRequest {
1239 return tracker.AnnounceRequest{
1240 Event: tracker.None,
1242 Port: uint16(t.cl.incomingPeerPort()),
1243 PeerId: t.cl.peerID,
1244 InfoHash: t.infoHash,
1245 Left: t.bytesLeftAnnounce(),
1249 // Adds peers revealed in an announce until the announce ends, or we have
1251 func (t *Torrent) consumeDHTAnnounce(pvs <-chan dht.PeersValues) {
1253 // Count all the unique addresses we got during this announce.
1254 allAddrs := make(map[string]struct{})
1257 case v, ok := <-pvs:
1261 addPeers := make([]Peer, 0, len(v.Peers))
1262 for _, cp := range v.Peers {
1264 // Can't do anything with this.
1267 addPeers = append(addPeers, Peer{
1270 Source: peerSourceDHTGetPeers,
1272 key := (&net.UDPAddr{
1276 allAddrs[key] = struct{}{}
1279 t.addPeers(addPeers)
1280 numPeers := len(t.peers)
1282 if numPeers >= cl.config.TorrentPeersHighWater {
1285 case <-t.closed.LockedChan(&cl.mu):
1291 func (t *Torrent) announceDHT(impliedPort bool) (err error) {
1293 ps, err := cl.dHT.Announce(t.infoHash, cl.incomingPeerPort(), impliedPort)
1297 t.consumeDHTAnnounce(ps.Peers)
1302 func (t *Torrent) dhtAnnouncer() {
1306 case <-t.wantPeersEvent.LockedChan(&cl.mu):
1307 case <-t.closed.LockedChan(&cl.mu):
1310 err := t.announceDHT(true)
1313 defer cl.mu.Unlock()
1317 log.Printf("error announcing %q to DHT: %s", t, err)
1321 case <-t.closed.LockedChan(&cl.mu):
1323 case <-time.After(5 * time.Minute):
1328 func (t *Torrent) addPeers(peers []Peer) {
1329 for _, p := range peers {
1330 if t.cl.badPeerIPPort(p.IP, p.Port) {
1337 func (t *Torrent) Stats() TorrentStats {
1339 defer t.cl.mu.Unlock()
1341 t.stats.ActivePeers = len(t.conns)
1342 t.stats.HalfOpenPeers = len(t.halfOpen)
1343 t.stats.PendingPeers = len(t.peers)
1344 t.stats.TotalPeers = t.numTotalPeers()
1349 // The total number of peers in the torrent.
1350 func (t *Torrent) numTotalPeers() int {
1351 peers := make(map[string]struct{})
1352 for conn := range t.conns {
1353 ra := conn.conn.RemoteAddr()
1355 // It's been closed and doesn't support RemoteAddr.
1358 peers[ra.String()] = struct{}{}
1360 for addr := range t.halfOpen {
1361 peers[addr] = struct{}{}
1363 for _, peer := range t.peers {
1364 peers[fmt.Sprintf("%s:%d", peer.IP, peer.Port)] = struct{}{}
1369 // Returns true if the connection is added.
1370 func (t *Torrent) addConnection(c *connection, outgoing bool) bool {
1371 if t.cl.closed.IsSet() {
1377 for c0 := range t.conns {
1378 if c.PeerID == c0.PeerID {
1379 // Already connected to a client with that ID.
1380 duplicateClientConns.Add(1)
1381 lower := string(t.cl.peerID[:]) < string(c.PeerID[:])
1382 // Retain the connection from initiated from lower peer ID to
1384 if outgoing == lower {
1385 // Close the other one.
1387 // TODO: Is it safe to delete from the map while we're
1388 // iterating over it?
1389 t.deleteConnection(c0)
1391 // Abandon this one.
1396 if len(t.conns) >= t.maxEstablishedConns {
1397 c := t.worstBadConn()
1401 if t.cl.config.Debug && missinggo.CryHeard() {
1402 log.Printf("%s: dropping connection to make room for new one:\n %s", t, c)
1405 t.deleteConnection(c)
1407 if len(t.conns) >= t.maxEstablishedConns {
1411 panic("connection already associated with a torrent")
1413 // Reconcile bytes transferred before connection was associated with a
1415 t.stats.wroteBytes(c.stats.BytesWritten)
1416 t.stats.readBytes(c.stats.BytesRead)
1418 t.conns[c] = struct{}{}
1422 func (t *Torrent) wantConns() bool {
1423 if !t.networkingEnabled {
1426 if t.closed.IsSet() {
1429 if !t.seeding() && !t.needData() {
1432 if len(t.conns) < t.maxEstablishedConns {
1435 return t.worstBadConn() != nil
1438 func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
1440 defer t.cl.mu.Unlock()
1441 oldMax = t.maxEstablishedConns
1442 t.maxEstablishedConns = max
1443 wcs := slices.HeapInterface(slices.FromMapKeys(t.conns), worseConn)
1444 for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 {
1445 t.dropConnection(wcs.Pop().(*connection))
1451 func (t *Torrent) mu() missinggo.RWLocker {
1455 func (t *Torrent) pieceHashed(piece int, correct bool) {
1456 if t.closed.IsSet() {
1459 p := &t.pieces[piece]
1460 touchers := t.reapPieceTouchers(piece)
1462 // Don't score the first time a piece is hashed, it could be an
1465 pieceHashedCorrect.Add(1)
1467 log.Printf("%s: piece %d (%s) failed hash: %d connections contributed", t, piece, p.hash, len(touchers))
1468 pieceHashedNotCorrect.Add(1)
1473 for _, c := range touchers {
1474 c.goodPiecesDirtied++
1476 err := p.Storage().MarkComplete()
1478 log.Printf("%T: error marking piece complete %d: %s", t.storage, piece, err)
1480 t.updatePieceCompletion(piece)
1482 if len(touchers) != 0 {
1483 for _, c := range touchers {
1484 // Y u do dis peer?!
1485 c.badPiecesDirtied++
1487 slices.Sort(touchers, connLessTrusted)
1488 if t.cl.config.Debug {
1489 log.Printf("dropping first corresponding conn from trust: %v", func() (ret []int) {
1490 for _, c := range touchers {
1491 ret = append(ret, c.netGoodPiecesDirtied())
1497 t.cl.banPeerIP(missinggo.AddrIP(c.remoteAddr()))
1500 t.onIncompletePiece(piece)
1504 func (t *Torrent) cancelRequestsForPiece(piece int) {
1505 // TODO: Make faster
1506 for cn := range t.conns {
1511 func (t *Torrent) onPieceCompleted(piece int) {
1512 t.pendingPieces.Remove(piece)
1513 t.pendAllChunkSpecs(piece)
1514 t.cancelRequestsForPiece(piece)
1515 for conn := range t.conns {
1520 func (t *Torrent) onIncompletePiece(piece int) {
1521 if t.pieceAllDirty(piece) {
1522 t.pendAllChunkSpecs(piece)
1524 if !t.wantPieceIndex(piece) {
1527 // We could drop any connections that we told we have a piece that we
1528 // don't here. But there's a test failure, and it seems clients don't care
1529 // if you request pieces that you already claim to have. Pruning bad
1530 // connections might just remove any connections that aren't treating us
1531 // favourably anyway.
1533 // for c := range t.conns {
1534 // if c.sentHave(piece) {
1538 for conn := range t.conns {
1539 if conn.PeerHasPiece(piece) {
1540 conn.updateRequests()
1545 func (t *Torrent) verifyPiece(piece int) {
1548 defer cl.mu.Unlock()
1549 p := &t.pieces[piece]
1552 cl.event.Broadcast()
1554 for p.hashing || t.storage == nil {
1557 if !p.t.piecesQueuedForHash.Remove(piece) {
1558 panic("piece was not queued")
1560 if t.closed.IsSet() || t.pieceComplete(piece) {
1561 t.updatePiecePriority(piece)
1565 t.publishPieceChange(piece)
1566 t.storageLock.RLock()
1568 sum := t.hashPiece(piece)
1569 t.storageLock.RUnlock()
1572 t.pieceHashed(piece, sum == p.hash)
1573 t.publishPieceChange(piece)
1576 // Return the connections that touched a piece, and clear the entry while
1578 func (t *Torrent) reapPieceTouchers(piece int) (ret []*connection) {
1579 for c := range t.conns {
1580 if _, ok := c.peerTouchedPieces[piece]; ok {
1581 ret = append(ret, c)
1582 delete(c.peerTouchedPieces, piece)
1588 func (t *Torrent) connsAsSlice() (ret []*connection) {
1589 for c := range t.conns {
1590 ret = append(ret, c)
1595 // Currently doesn't really queue, but should in the future.
1596 func (t *Torrent) queuePieceCheck(pieceIndex int) {
1597 piece := &t.pieces[pieceIndex]
1598 if piece.queuedForHash() {
1601 t.piecesQueuedForHash.Add(pieceIndex)
1602 t.publishPieceChange(pieceIndex)
1603 go t.verifyPiece(pieceIndex)
1606 func (t *Torrent) VerifyData() {
1607 for i := range iter.N(t.NumPieces()) {
1608 t.Piece(i).VerifyData()