15 "github.com/anacrolix/missinggo"
16 "github.com/anacrolix/missinggo/bitmap"
17 "github.com/anacrolix/missinggo/itertools"
18 "github.com/anacrolix/missinggo/perf"
19 "github.com/anacrolix/missinggo/pubsub"
20 "github.com/bradfitz/iter"
22 "github.com/anacrolix/torrent/bencode"
23 "github.com/anacrolix/torrent/metainfo"
24 pp "github.com/anacrolix/torrent/peer_protocol"
25 "github.com/anacrolix/torrent/storage"
28 func (t *Torrent) chunkIndexSpec(chunkIndex, piece int) chunkSpec {
29 return chunkIndexSpec(chunkIndex, t.pieceLength(piece), t.chunkSize)
32 type peersKey struct {
37 // Maintains state of torrent within a Client.
43 // Closed when no more network activity is desired. This includes
44 // announcing, and communicating with peers.
45 ceasingNetworking chan struct{}
47 infoHash metainfo.Hash
49 // Values are the piece indices that changed.
50 pieceStateChanges *pubsub.PubSub
52 // Total length of the torrent in bytes. Stored because it's not O(1) to
53 // get this from the info dict.
56 // The storage to open when the info dict becomes available.
57 storageOpener storage.I
58 // Storage for torrent data.
59 storage storage.Torrent
61 // The info dict. nil if we don't have it (yet).
63 // Active peer connections, running message stream loops.
65 // Set of addrs to which we're attempting to connect. Connections are
66 // half-open until all handshakes are completed.
67 halfOpen map[string]struct{}
69 // Reserve of peers to connect to. A peer can be both here and in the
70 // active connections if were told about the peer after connecting with
71 // them. That encourages us to reconnect to peers that are well known.
72 peers map[peersKey]Peer
75 // BEP 12 Multitracker Metadata Extension. The tracker.Client instances
76 // mirror their respective URLs from the announce-list metainfo key.
77 trackers []trackerTier
78 // Name used if the info name isn't available.
80 // The bencoded bytes of the info dict.
82 // Each element corresponds to the 16KiB metadata pieces. If true, we have
83 // received that piece.
84 metadataCompletedChunks []bool
86 // Closed when .Info is set.
87 gotMetainfo chan struct{}
89 readers map[*Reader]struct{}
91 pendingPieces bitmap.Bitmap
92 completedPieces bitmap.Bitmap
94 connPieceInclinationPool sync.Pool
97 func (t *Torrent) setDisplayName(dn string) {
101 func (t *Torrent) pieceComplete(piece int) bool {
102 return t.completedPieces.Get(piece)
105 func (t *Torrent) pieceCompleteUncached(piece int) bool {
106 return t.pieces[piece].Storage().GetIsComplete()
109 func (t *Torrent) numConnsUnchoked() (num int) {
110 for _, c := range t.conns {
118 // There's a connection to that address already.
119 func (t *Torrent) addrActive(addr string) bool {
120 if _, ok := t.halfOpen[addr]; ok {
123 for _, c := range t.conns {
124 if c.remoteAddr().String() == addr {
131 func (t *Torrent) worstConns(cl *Client) (wcs *worstConns) {
133 c: make([]*connection, 0, len(t.conns)),
137 for _, c := range t.conns {
138 if !c.closed.IsSet() {
139 wcs.c = append(wcs.c, c)
145 func (t *Torrent) ceaseNetworking() {
147 case <-t.ceasingNetworking:
151 close(t.ceasingNetworking)
152 for _, c := range t.conns {
157 func (t *Torrent) addPeer(p Peer, cl *Client) {
159 if len(t.peers) >= torrentPeersHighWater {
162 key := peersKey{string(p.IP), p.Port}
163 if _, ok := t.peers[key]; ok {
167 peersAddedBySource.Add(string(p.Source), 1)
172 func (t *Torrent) invalidateMetadata() {
173 t.metadataBytes = nil
174 t.metadataCompletedChunks = nil
178 func (t *Torrent) saveMetadataPiece(index int, data []byte) {
182 if index >= len(t.metadataCompletedChunks) {
183 log.Printf("%s: ignoring metadata piece %d", t, index)
186 copy(t.metadataBytes[(1<<14)*index:], data)
187 t.metadataCompletedChunks[index] = true
190 func (t *Torrent) metadataPieceCount() int {
191 return (len(t.metadataBytes) + (1 << 14) - 1) / (1 << 14)
194 func (t *Torrent) haveMetadataPiece(piece int) bool {
196 return (1<<14)*piece < len(t.metadataBytes)
198 return piece < len(t.metadataCompletedChunks) && t.metadataCompletedChunks[piece]
202 func (t *Torrent) metadataSizeKnown() bool {
203 return t.metadataBytes != nil
206 func (t *Torrent) metadataSize() int {
207 return len(t.metadataBytes)
210 func infoPieceHashes(info *metainfo.Info) (ret []string) {
211 for i := 0; i < len(info.Pieces); i += 20 {
212 ret = append(ret, string(info.Pieces[i:i+20]))
217 // Called when metadata for a torrent becomes available.
218 func (t *Torrent) setMetadata(md *metainfo.Info, infoBytes []byte) (err error) {
219 err = validateInfo(md)
221 err = fmt.Errorf("bad info: %s", err)
224 t.info = &metainfo.InfoEx{
229 t.storage, err = t.storageOpener.OpenTorrent(t.info)
234 for _, f := range t.info.UpvertedFiles() {
237 t.metadataBytes = infoBytes
238 t.metadataCompletedChunks = nil
239 hashes := infoPieceHashes(md)
240 t.pieces = make([]piece, len(hashes))
241 for i, hash := range hashes {
242 piece := &t.pieces[i]
245 piece.noPendingWrites.L = &piece.pendingWritesMutex
246 missinggo.CopyExact(piece.Hash[:], hash)
248 for _, conn := range t.conns {
249 if err := conn.setNumPieces(t.numPieces()); err != nil {
250 log.Printf("closing connection: %s", err)
254 for i := range t.pieces {
255 t.updatePieceCompletion(i)
256 t.pieces[i].QueuedForHash = true
259 for i := range t.pieces {
266 func (t *Torrent) verifyPiece(piece int) {
267 t.cl.verifyPiece(t, piece)
270 func (t *Torrent) haveAllMetadataPieces() bool {
274 if t.metadataCompletedChunks == nil {
277 for _, have := range t.metadataCompletedChunks {
285 // TODO: Propagate errors to disconnect peer.
286 func (t *Torrent) setMetadataSize(bytes int64, cl *Client) {
288 // We already know the correct metadata size.
291 if bytes <= 0 || bytes > 10000000 { // 10MB, pulled from my ass.
292 log.Printf("%s: received bad metadata size: %d", t, bytes)
295 if t.metadataBytes != nil && len(t.metadataBytes) == int(bytes) {
298 t.metadataBytes = make([]byte, bytes)
299 t.metadataCompletedChunks = make([]bool, (bytes+(1<<14)-1)/(1<<14))
300 for _, c := range t.conns {
301 cl.requestPendingMetadata(t, c)
306 // The current working name for the torrent. Either the name in the info dict,
307 // or a display name given such as by the dn value in a magnet link, or "".
308 func (t *Torrent) name() string {
315 func (t *Torrent) pieceState(index int) (ret PieceState) {
316 p := &t.pieces[index]
317 ret.Priority = t.piecePriority(index)
318 if t.pieceComplete(index) {
321 if p.QueuedForHash || p.Hashing {
324 if !ret.Complete && t.piecePartiallyDownloaded(index) {
330 func (t *Torrent) metadataPieceSize(piece int) int {
331 return metadataPieceSize(len(t.metadataBytes), piece)
334 func (t *Torrent) newMetadataExtensionMessage(c *connection, msgType int, piece int, data []byte) pp.Message {
340 d["total_size"] = len(t.metadataBytes)
342 p, err := bencode.Marshal(d)
348 ExtendedID: c.PeerExtensionIDs["ut_metadata"],
349 ExtendedPayload: append(p, data...),
353 func (t *Torrent) pieceStateRuns() (ret []PieceStateRun) {
354 rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
355 ret = append(ret, PieceStateRun{
356 PieceState: el.(PieceState),
360 for index := range t.pieces {
361 rle.Append(t.pieceState(index), 1)
367 // Produces a small string representing a PieceStateRun.
368 func pieceStateRunStatusChars(psr PieceStateRun) (ret string) {
369 ret = fmt.Sprintf("%d", psr.Length)
370 ret += func() string {
371 switch psr.Priority {
372 case PiecePriorityNext:
374 case PiecePriorityNormal:
376 case PiecePriorityReadahead:
378 case PiecePriorityNow:
396 func (t *Torrent) writeStatus(w io.Writer, cl *Client) {
397 fmt.Fprintf(w, "Infohash: %x\n", t.infoHash)
398 fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
400 fmt.Fprintf(w, "Metadata have: ")
401 for _, h := range t.metadataCompletedChunks {
402 fmt.Fprintf(w, "%c", func() rune {
412 fmt.Fprintf(w, "Piece length: %s\n", func() string {
414 return fmt.Sprint(t.usualPieceSize())
420 fmt.Fprintf(w, "Num Pieces: %d\n", t.numPieces())
421 fmt.Fprint(w, "Piece States:")
422 for _, psr := range t.pieceStateRuns() {
424 w.Write([]byte(pieceStateRunStatusChars(psr)))
428 fmt.Fprintf(w, "Reader Pieces:")
429 t.forReaderOffsetPieces(func(begin, end int) (again bool) {
430 fmt.Fprintf(w, " %d:%d", begin, end)
434 fmt.Fprintf(w, "Trackers: ")
435 for _, tier := range t.trackers {
436 for _, tr := range tier {
437 fmt.Fprintf(w, "%q ", tr)
441 fmt.Fprintf(w, "Pending peers: %d\n", len(t.peers))
442 fmt.Fprintf(w, "Half open: %d\n", len(t.halfOpen))
443 fmt.Fprintf(w, "Active peers: %d\n", len(t.conns))
444 sort.Sort(&worstConns{
449 for i, c := range t.conns {
450 fmt.Fprintf(w, "%2d. ", i+1)
455 func (t *Torrent) String() string {
458 s = fmt.Sprintf("%x", t.infoHash)
463 func (t *Torrent) haveInfo() bool {
467 // TODO: Include URIs that weren't converted to tracker clients.
468 func (t *Torrent) announceList() (al [][]string) {
469 missinggo.CastSlice(&al, t.trackers)
473 // Returns a run-time generated MetaInfo that includes the info bytes and
474 // announce-list as currently known to the client.
475 func (t *Torrent) metainfo() *metainfo.MetaInfo {
476 if t.metadataBytes == nil {
477 panic("info bytes not set")
479 return &metainfo.MetaInfo{
481 CreationDate: time.Now().Unix(),
482 Comment: "dynamic metainfo from client",
483 CreatedBy: "go.torrent",
484 AnnounceList: t.announceList(),
488 func (t *Torrent) bytesLeft() (left int64) {
489 for i := 0; i < t.numPieces(); i++ {
490 left += int64(t.pieces[i].bytesLeft())
495 // Bytes left to give in tracker announces.
496 func (t *Torrent) bytesLeftAnnounce() uint64 {
498 return uint64(t.bytesLeft())
500 return math.MaxUint64
504 func (t *Torrent) piecePartiallyDownloaded(piece int) bool {
505 if t.pieceComplete(piece) {
508 if t.pieceAllDirty(piece) {
511 return t.pieces[piece].hasDirtyChunks()
514 func (t *Torrent) usualPieceSize() int {
515 return int(t.info.PieceLength)
518 func (t *Torrent) lastPieceSize() int {
519 return int(t.pieceLength(t.numPieces() - 1))
522 func (t *Torrent) numPieces() int {
523 return t.info.NumPieces()
526 func (t *Torrent) numPiecesCompleted() (num int) {
527 return t.completedPieces.Len()
530 // Safe to call with or without client lock.
531 func (t *Torrent) isClosed() bool {
540 func (t *Torrent) close() (err error) {
546 if c, ok := t.storage.(io.Closer); ok {
549 for _, conn := range t.conns {
552 t.pieceStateChanges.Close()
556 func (t *Torrent) requestOffset(r request) int64 {
557 return torrentRequestOffset(t.length, int64(t.usualPieceSize()), r)
560 // Return the request that would include the given offset into the torrent
561 // data. Returns !ok if there is no such request.
562 func (t *Torrent) offsetRequest(off int64) (req request, ok bool) {
563 return torrentOffsetRequest(t.length, t.info.PieceLength, int64(t.chunkSize), off)
566 func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
567 tr := perf.NewTimer()
569 n, err := t.pieces[piece].Storage().WriteAt(data, begin)
570 if err == nil && n != len(data) {
571 err = io.ErrShortWrite
574 tr.Stop("write chunk")
579 func (t *Torrent) bitfield() (bf []bool) {
580 bf = make([]bool, t.numPieces())
581 t.completedPieces.IterTyped(func(piece int) (again bool) {
588 func (t *Torrent) validOutgoingRequest(r request) bool {
589 if r.Index >= pp.Integer(t.info.NumPieces()) {
592 if r.Begin%t.chunkSize != 0 {
595 if r.Length > t.chunkSize {
598 pieceLength := t.pieceLength(int(r.Index))
599 if r.Begin+r.Length > pieceLength {
602 return r.Length == t.chunkSize || r.Begin+r.Length == pieceLength
605 func (t *Torrent) pieceChunks(piece int) (css []chunkSpec) {
606 css = make([]chunkSpec, 0, (t.pieceLength(piece)+t.chunkSize-1)/t.chunkSize)
608 for left := t.pieceLength(piece); left != 0; left -= cs.Length {
610 if cs.Length > t.chunkSize {
611 cs.Length = t.chunkSize
613 css = append(css, cs)
614 cs.Begin += cs.Length
619 func (t *Torrent) pieceNumChunks(piece int) int {
620 return int((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize)
623 func (t *Torrent) pendAllChunkSpecs(pieceIndex int) {
624 t.pieces[pieceIndex].DirtyChunks.Clear()
632 // Peer is known to support encryption.
633 SupportsEncryption bool
636 func (t *Torrent) pieceLength(piece int) (len_ pp.Integer) {
637 if piece < 0 || piece >= t.info.NumPieces() {
640 if piece == t.numPieces()-1 {
641 len_ = pp.Integer(t.length % t.info.PieceLength)
644 len_ = pp.Integer(t.info.PieceLength)
649 func (t *Torrent) hashPiece(piece int) (ret metainfo.Hash) {
650 hash := pieceHash.New()
651 p := &t.pieces[piece]
652 p.waitNoPendingWrites()
653 ip := t.info.Piece(piece)
655 n, err := io.Copy(hash, io.NewSectionReader(t.pieces[piece].Storage(), 0, pl))
657 missinggo.CopyExact(&ret, hash.Sum(nil))
660 if err != io.ErrUnexpectedEOF {
661 log.Printf("unexpected error hashing piece with %T: %s", t.storage, err)
666 func (t *Torrent) haveAllPieces() bool {
670 return t.completedPieces.Len() == t.numPieces()
673 func (t *Torrent) haveAnyPieces() bool {
674 for i := range t.pieces {
675 if t.pieceComplete(i) {
682 func (t *Torrent) havePiece(index int) bool {
683 return t.haveInfo() && t.pieceComplete(index)
686 func (t *Torrent) haveChunk(r request) (ret bool) {
688 // log.Println("have chunk", r, ret)
693 if t.pieceComplete(int(r.Index)) {
696 p := &t.pieces[r.Index]
697 return !p.pendingChunk(r.chunkSpec, t.chunkSize)
700 func chunkIndex(cs chunkSpec, chunkSize pp.Integer) int {
701 return int(cs.Begin / chunkSize)
704 func (t *Torrent) wantPiece(r request) bool {
705 if !t.wantPieceIndex(int(r.Index)) {
708 if t.pieces[r.Index].pendingChunk(r.chunkSpec, t.chunkSize) {
711 // TODO: What about pieces that were wanted, but aren't now, and aren't
712 // completed either? That used to be done here.
716 func (t *Torrent) wantPieceIndex(index int) bool {
720 p := &t.pieces[index]
727 if t.pieceComplete(index) {
730 if t.pendingPieces.Contains(index) {
733 return !t.forReaderOffsetPieces(func(begin, end int) bool {
734 return index < begin || index >= end
738 func (t *Torrent) forNeededPieces(f func(piece int) (more bool)) (all bool) {
739 return t.forReaderOffsetPieces(func(begin, end int) (more bool) {
740 for i := begin; begin < end; i++ {
749 func (t *Torrent) connHasWantedPieces(c *connection) bool {
750 return !c.pieceRequestOrder.IsEmpty()
753 func (t *Torrent) extentPieces(off, _len int64) (pieces []int) {
754 for i := off / int64(t.usualPieceSize()); i*int64(t.usualPieceSize()) < off+_len; i++ {
755 pieces = append(pieces, int(i))
760 func (t *Torrent) worstBadConn(cl *Client) *connection {
761 wcs := t.worstConns(cl)
764 c := heap.Pop(wcs).(*connection)
765 if c.UnwantedChunksReceived >= 6 && c.UnwantedChunksReceived > c.UsefulChunksReceived {
768 if wcs.Len() >= (socketsPerTorrent+1)/2 {
769 // Give connections 1 minute to prove themselves.
770 if time.Since(c.completedHandshake) > time.Minute {
778 type PieceStateChange struct {
783 func (t *Torrent) publishPieceChange(piece int) {
784 cur := t.pieceState(piece)
785 p := &t.pieces[piece]
786 if cur != p.PublicPieceState {
787 p.PublicPieceState = cur
788 t.pieceStateChanges.Publish(PieceStateChange{
795 func (t *Torrent) pieceNumPendingChunks(piece int) int {
796 if t.pieceComplete(piece) {
799 return t.pieceNumChunks(piece) - t.pieces[piece].numDirtyChunks()
802 func (t *Torrent) pieceAllDirty(piece int) bool {
803 return t.pieces[piece].DirtyChunks.Len() == t.pieceNumChunks(piece)
806 func (t *Torrent) forUrgentPieces(f func(piece int) (again bool)) (all bool) {
807 return t.forReaderOffsetPieces(func(begin, end int) (again bool) {
817 func (t *Torrent) readersChanged() {
818 t.updatePiecePriorities()
821 func (t *Torrent) maybeNewConns() {
822 // Tickle the accept routine.
823 t.cl.event.Broadcast()
827 func (t *Torrent) piecePriorityChanged(piece int) {
828 for _, c := range t.conns {
829 c.updatePiecePriority(piece)
832 t.publishPieceChange(piece)
835 func (t *Torrent) updatePiecePriority(piece int) bool {
836 p := &t.pieces[piece]
837 newPrio := t.piecePriorityUncached(piece)
838 if newPrio == p.priority {
845 // Update all piece priorities in one hit. This function should have the same
846 // output as updatePiecePriority, but across all pieces.
847 func (t *Torrent) updatePiecePriorities() {
848 newPrios := make([]piecePriority, t.numPieces())
849 t.pendingPieces.IterTyped(func(piece int) (more bool) {
850 newPrios[piece] = PiecePriorityNormal
853 t.forReaderOffsetPieces(func(begin, end int) (next bool) {
855 newPrios[begin].Raise(PiecePriorityNow)
857 for i := begin + 1; i < end; i++ {
858 newPrios[i].Raise(PiecePriorityReadahead)
862 t.completedPieces.IterTyped(func(piece int) (more bool) {
863 newPrios[piece] = PiecePriorityNone
866 for i, prio := range newPrios {
867 if prio != t.pieces[i].priority {
868 t.pieces[i].priority = prio
869 t.piecePriorityChanged(i)
874 func (t *Torrent) byteRegionPieces(off, size int64) (begin, end int) {
885 begin = int(off / t.info.PieceLength)
886 end = int((off + size + t.info.PieceLength - 1) / t.info.PieceLength)
887 if end > t.info.NumPieces() {
888 end = t.info.NumPieces()
893 // Returns true if all iterations complete without breaking.
894 func (t *Torrent) forReaderOffsetPieces(f func(begin, end int) (more bool)) (all bool) {
895 // There's an oppurtunity here to build a map of beginning pieces, and a
896 // bitmap of the rest. I wonder if it's worth the allocation overhead.
897 for r := range t.readers {
899 pos, readahead := r.pos, r.readahead
904 begin, end := t.byteRegionPieces(pos, readahead)
915 func (t *Torrent) piecePriority(piece int) piecePriority {
917 return PiecePriorityNone
919 return t.pieces[piece].priority
922 func (t *Torrent) piecePriorityUncached(piece int) (ret piecePriority) {
923 ret = PiecePriorityNone
924 if t.pieceComplete(piece) {
927 if t.pendingPieces.Contains(piece) {
928 ret = PiecePriorityNormal
930 raiseRet := ret.Raise
931 t.forReaderOffsetPieces(func(begin, end int) (again bool) {
933 raiseRet(PiecePriorityNow)
935 if begin <= piece && piece < end {
936 raiseRet(PiecePriorityReadahead)
943 func (t *Torrent) pendPiece(piece int) {
944 if t.pendingPieces.Contains(piece) {
947 if t.havePiece(piece) {
950 t.pendingPieces.Add(piece)
951 if !t.updatePiecePriority(piece) {
954 t.piecePriorityChanged(piece)
957 func (t *Torrent) getCompletedPieces() (ret bitmap.Bitmap) {
958 return t.completedPieces.Copy()
961 func (t *Torrent) unpendPieces(unpend *bitmap.Bitmap) {
962 t.pendingPieces.Sub(unpend)
963 t.updatePiecePriorities()
966 func (t *Torrent) pendPieceRange(begin, end int) {
967 for i := begin; i < end; i++ {
972 func (t *Torrent) unpendPieceRange(begin, end int) {
974 bm.AddRange(begin, end)
978 func (t *Torrent) connRequestPiecePendingChunks(c *connection, piece int) (more bool) {
979 if !c.PeerHasPiece(piece) {
982 chunkIndices := t.pieces[piece].undirtiedChunkIndices().ToSortedSlice()
983 return itertools.ForPerm(len(chunkIndices), func(i int) bool {
984 req := request{pp.Integer(piece), t.chunkIndexSpec(chunkIndices[i], piece)}
985 return c.Request(req)
989 func (t *Torrent) pendRequest(req request) {
990 ci := chunkIndex(req.chunkSpec, t.chunkSize)
991 t.pieces[req.Index].pendChunkIndex(ci)
994 func (t *Torrent) pieceChanged(piece int) {
995 t.cl.pieceChanged(t, piece)
998 func (t *Torrent) openNewConns() {
1002 func (t *Torrent) getConnPieceInclination() []int {
1003 _ret := t.connPieceInclinationPool.Get()
1005 pieceInclinationsNew.Add(1)
1006 return rand.Perm(t.numPieces())
1008 pieceInclinationsReused.Add(1)
1012 func (t *Torrent) putPieceInclination(pi []int) {
1013 t.connPieceInclinationPool.Put(pi)
1014 pieceInclinationsPut.Add(1)
1017 func (t *Torrent) updatePieceCompletion(piece int) {
1018 pcu := t.pieceCompleteUncached(piece)
1019 changed := t.completedPieces.Get(piece) != pcu
1020 t.completedPieces.Set(piece, pcu)
1022 t.pieceChanged(piece)
1026 // Non-blocking read. Client lock is not required.
1027 func (t *Torrent) readAt(b []byte, off int64) (n int, err error) {
1028 p := &t.pieces[off/t.info.PieceLength]
1029 p.waitNoPendingWrites()
1030 return p.Storage().ReadAt(b, off-p.Info().Offset())
1033 func (t *Torrent) updateAllPieceCompletions() {
1034 for i := range iter.N(t.numPieces()) {
1035 t.updatePieceCompletion(i)