16 "github.com/anacrolix/missinggo"
17 "github.com/anacrolix/missinggo/bitmap"
18 "github.com/anacrolix/missinggo/itertools"
19 "github.com/anacrolix/missinggo/perf"
20 "github.com/anacrolix/missinggo/pubsub"
21 "github.com/bradfitz/iter"
23 "github.com/anacrolix/torrent/bencode"
24 "github.com/anacrolix/torrent/metainfo"
25 pp "github.com/anacrolix/torrent/peer_protocol"
26 "github.com/anacrolix/torrent/storage"
29 func (t *Torrent) chunkIndexSpec(chunkIndex, piece int) chunkSpec {
30 return chunkIndexSpec(chunkIndex, t.pieceLength(piece), t.chunkSize)
33 type peersKey struct {
38 // Maintains state of torrent within a Client.
44 // Closed when no more network activity is desired. This includes
45 // announcing, and communicating with peers.
46 ceasingNetworking chan struct{}
48 infoHash metainfo.Hash
50 // Values are the piece indices that changed.
51 pieceStateChanges *pubsub.PubSub
53 // Total length of the torrent in bytes. Stored because it's not O(1) to
54 // get this from the info dict.
57 // The storage to open when the info dict becomes available.
58 storageOpener storage.I
59 // Storage for torrent data.
60 storage storage.Torrent
62 // The info dict. nil if we don't have it (yet).
64 // Active peer connections, running message stream loops.
66 // Set of addrs to which we're attempting to connect. Connections are
67 // half-open until all handshakes are completed.
68 halfOpen map[string]struct{}
70 // Reserve of peers to connect to. A peer can be both here and in the
71 // active connections if were told about the peer after connecting with
72 // them. That encourages us to reconnect to peers that are well known.
73 peers map[peersKey]Peer
76 // BEP 12 Multitracker Metadata Extension. The tracker.Client instances
77 // mirror their respective URLs from the announce-list metainfo key.
78 trackers []trackerTier
79 // Name used if the info name isn't available.
81 // The bencoded bytes of the info dict.
83 // Each element corresponds to the 16KiB metadata pieces. If true, we have
84 // received that piece.
85 metadataCompletedChunks []bool
87 // Closed when .Info is set.
88 gotMetainfo chan struct{}
90 readers map[*Reader]struct{}
92 pendingPieces bitmap.Bitmap
93 completedPieces bitmap.Bitmap
95 connPieceInclinationPool sync.Pool
99 pieceInclinationsReused = expvar.NewInt("pieceInclinationsReused")
100 pieceInclinationsNew = expvar.NewInt("pieceInclinationsNew")
101 pieceInclinationsPut = expvar.NewInt("pieceInclinationsPut")
104 func (t *Torrent) setDisplayName(dn string) {
108 func (t *Torrent) pieceComplete(piece int) bool {
109 return t.completedPieces.Get(piece)
112 func (t *Torrent) pieceCompleteUncached(piece int) bool {
113 return t.pieces[piece].Storage().GetIsComplete()
116 func (t *Torrent) numConnsUnchoked() (num int) {
117 for _, c := range t.conns {
125 // There's a connection to that address already.
126 func (t *Torrent) addrActive(addr string) bool {
127 if _, ok := t.halfOpen[addr]; ok {
130 for _, c := range t.conns {
131 if c.remoteAddr().String() == addr {
138 func (t *Torrent) worstConns(cl *Client) (wcs *worstConns) {
140 c: make([]*connection, 0, len(t.conns)),
144 for _, c := range t.conns {
145 if !c.closed.IsSet() {
146 wcs.c = append(wcs.c, c)
152 func (t *Torrent) ceaseNetworking() {
154 case <-t.ceasingNetworking:
158 close(t.ceasingNetworking)
159 for _, c := range t.conns {
164 func (t *Torrent) addPeer(p Peer, cl *Client) {
166 if len(t.peers) >= torrentPeersHighWater {
169 key := peersKey{string(p.IP), p.Port}
170 if _, ok := t.peers[key]; ok {
174 peersAddedBySource.Add(string(p.Source), 1)
179 func (t *Torrent) invalidateMetadata() {
180 t.metadataBytes = nil
181 t.metadataCompletedChunks = nil
185 func (t *Torrent) saveMetadataPiece(index int, data []byte) {
189 if index >= len(t.metadataCompletedChunks) {
190 log.Printf("%s: ignoring metadata piece %d", t, index)
193 copy(t.metadataBytes[(1<<14)*index:], data)
194 t.metadataCompletedChunks[index] = true
197 func (t *Torrent) metadataPieceCount() int {
198 return (len(t.metadataBytes) + (1 << 14) - 1) / (1 << 14)
201 func (t *Torrent) haveMetadataPiece(piece int) bool {
203 return (1<<14)*piece < len(t.metadataBytes)
205 return piece < len(t.metadataCompletedChunks) && t.metadataCompletedChunks[piece]
209 func (t *Torrent) metadataSizeKnown() bool {
210 return t.metadataBytes != nil
213 func (t *Torrent) metadataSize() int {
214 return len(t.metadataBytes)
217 func infoPieceHashes(info *metainfo.Info) (ret []string) {
218 for i := 0; i < len(info.Pieces); i += 20 {
219 ret = append(ret, string(info.Pieces[i:i+20]))
224 // Called when metadata for a torrent becomes available.
225 func (t *Torrent) setMetadata(md *metainfo.Info, infoBytes []byte) (err error) {
226 err = validateInfo(md)
228 err = fmt.Errorf("bad info: %s", err)
231 t.info = &metainfo.InfoEx{
236 t.storage, err = t.storageOpener.OpenTorrent(t.info)
241 for _, f := range t.info.UpvertedFiles() {
244 t.metadataBytes = infoBytes
245 t.metadataCompletedChunks = nil
246 hashes := infoPieceHashes(md)
247 t.pieces = make([]piece, len(hashes))
248 for i, hash := range hashes {
249 piece := &t.pieces[i]
252 piece.noPendingWrites.L = &piece.pendingWritesMutex
253 missinggo.CopyExact(piece.Hash[:], hash)
255 for _, conn := range t.conns {
256 if err := conn.setNumPieces(t.numPieces()); err != nil {
257 log.Printf("closing connection: %s", err)
261 for i := range t.pieces {
262 t.updatePieceCompletion(i)
263 t.pieces[i].QueuedForHash = true
266 for i := range t.pieces {
273 func (t *Torrent) verifyPiece(piece int) {
274 t.cl.verifyPiece(t, piece)
277 func (t *Torrent) haveAllMetadataPieces() bool {
281 if t.metadataCompletedChunks == nil {
284 for _, have := range t.metadataCompletedChunks {
292 func (t *Torrent) setMetadataSize(bytes int64, cl *Client) {
294 // We already know the correct metadata size.
297 if bytes <= 0 || bytes > 10000000 { // 10MB, pulled from my ass.
298 log.Printf("received bad metadata size: %d", bytes)
301 if t.metadataBytes != nil && len(t.metadataBytes) == int(bytes) {
304 t.metadataBytes = make([]byte, bytes)
305 t.metadataCompletedChunks = make([]bool, (bytes+(1<<14)-1)/(1<<14))
306 for _, c := range t.conns {
307 cl.requestPendingMetadata(t, c)
312 // The current working name for the torrent. Either the name in the info dict,
313 // or a display name given such as by the dn value in a magnet link, or "".
314 func (t *Torrent) name() string {
321 func (t *Torrent) pieceState(index int) (ret PieceState) {
322 p := &t.pieces[index]
323 ret.Priority = t.piecePriority(index)
324 if t.pieceComplete(index) {
327 if p.QueuedForHash || p.Hashing {
330 if !ret.Complete && t.piecePartiallyDownloaded(index) {
336 func (t *Torrent) metadataPieceSize(piece int) int {
337 return metadataPieceSize(len(t.metadataBytes), piece)
340 func (t *Torrent) newMetadataExtensionMessage(c *connection, msgType int, piece int, data []byte) pp.Message {
346 d["total_size"] = len(t.metadataBytes)
348 p, err := bencode.Marshal(d)
354 ExtendedID: byte(c.PeerExtensionIDs["ut_metadata"]),
355 ExtendedPayload: append(p, data...),
359 func (t *Torrent) pieceStateRuns() (ret []PieceStateRun) {
360 rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
361 ret = append(ret, PieceStateRun{
362 PieceState: el.(PieceState),
366 for index := range t.pieces {
367 rle.Append(t.pieceState(index), 1)
373 // Produces a small string representing a PieceStateRun.
374 func pieceStateRunStatusChars(psr PieceStateRun) (ret string) {
375 ret = fmt.Sprintf("%d", psr.Length)
376 ret += func() string {
377 switch psr.Priority {
378 case PiecePriorityNext:
380 case PiecePriorityNormal:
382 case PiecePriorityReadahead:
384 case PiecePriorityNow:
402 func (t *Torrent) writeStatus(w io.Writer, cl *Client) {
403 fmt.Fprintf(w, "Infohash: %x\n", t.infoHash)
404 fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
406 fmt.Fprintf(w, "Metadata have: ")
407 for _, h := range t.metadataCompletedChunks {
408 fmt.Fprintf(w, "%c", func() rune {
418 fmt.Fprintf(w, "Piece length: %s\n", func() string {
420 return fmt.Sprint(t.usualPieceSize())
426 fmt.Fprintf(w, "Num Pieces: %d\n", t.numPieces())
427 fmt.Fprint(w, "Piece States:")
428 for _, psr := range t.pieceStateRuns() {
430 w.Write([]byte(pieceStateRunStatusChars(psr)))
434 fmt.Fprintf(w, "Reader Pieces:")
435 t.forReaderOffsetPieces(func(begin, end int) (again bool) {
436 fmt.Fprintf(w, " %d:%d", begin, end)
440 fmt.Fprintf(w, "Trackers: ")
441 for _, tier := range t.trackers {
442 for _, tr := range tier {
443 fmt.Fprintf(w, "%q ", tr)
447 fmt.Fprintf(w, "Pending peers: %d\n", len(t.peers))
448 fmt.Fprintf(w, "Half open: %d\n", len(t.halfOpen))
449 fmt.Fprintf(w, "Active peers: %d\n", len(t.conns))
450 sort.Sort(&worstConns{
455 for i, c := range t.conns {
456 fmt.Fprintf(w, "%2d. ", i+1)
461 func (t *Torrent) String() string {
464 s = fmt.Sprintf("%x", t.infoHash)
469 func (t *Torrent) haveInfo() bool {
473 // TODO: Include URIs that weren't converted to tracker clients.
474 func (t *Torrent) announceList() (al [][]string) {
475 missinggo.CastSlice(&al, t.trackers)
479 // Returns a run-time generated MetaInfo that includes the info bytes and
480 // announce-list as currently known to the client.
481 func (t *Torrent) metainfo() *metainfo.MetaInfo {
482 if t.metadataBytes == nil {
483 panic("info bytes not set")
485 return &metainfo.MetaInfo{
487 CreationDate: time.Now().Unix(),
488 Comment: "dynamic metainfo from client",
489 CreatedBy: "go.torrent",
490 AnnounceList: t.announceList(),
494 func (t *Torrent) bytesLeft() (left int64) {
495 for i := 0; i < t.numPieces(); i++ {
496 left += int64(t.pieces[i].bytesLeft())
501 // Bytes left to give in tracker announces.
502 func (t *Torrent) bytesLeftAnnounce() uint64 {
504 return uint64(t.bytesLeft())
506 return math.MaxUint64
510 func (t *Torrent) piecePartiallyDownloaded(piece int) bool {
511 if t.pieceComplete(piece) {
514 if t.pieceAllDirty(piece) {
517 return t.pieces[piece].hasDirtyChunks()
520 func (t *Torrent) usualPieceSize() int {
521 return int(t.info.PieceLength)
524 func (t *Torrent) lastPieceSize() int {
525 return int(t.pieceLength(t.numPieces() - 1))
528 func (t *Torrent) numPieces() int {
529 return t.info.NumPieces()
532 func (t *Torrent) numPiecesCompleted() (num int) {
533 return t.completedPieces.Len()
536 // Safe to call with or without client lock.
537 func (t *Torrent) isClosed() bool {
546 func (t *Torrent) close() (err error) {
552 if c, ok := t.storage.(io.Closer); ok {
555 for _, conn := range t.conns {
558 t.pieceStateChanges.Close()
562 func (t *Torrent) requestOffset(r request) int64 {
563 return torrentRequestOffset(t.length, int64(t.usualPieceSize()), r)
566 // Return the request that would include the given offset into the torrent
567 // data. Returns !ok if there is no such request.
568 func (t *Torrent) offsetRequest(off int64) (req request, ok bool) {
569 return torrentOffsetRequest(t.length, t.info.PieceLength, int64(t.chunkSize), off)
572 func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
573 tr := perf.NewTimer()
575 n, err := t.pieces[piece].Storage().WriteAt(data, begin)
576 if err == nil && n != len(data) {
577 err = io.ErrShortWrite
580 tr.Stop("write chunk")
585 func (t *Torrent) bitfield() (bf []bool) {
586 bf = make([]bool, t.numPieces())
587 t.completedPieces.IterTyped(func(piece int) (again bool) {
594 func (t *Torrent) validOutgoingRequest(r request) bool {
595 if r.Index >= pp.Integer(t.info.NumPieces()) {
598 if r.Begin%t.chunkSize != 0 {
601 if r.Length > t.chunkSize {
604 pieceLength := t.pieceLength(int(r.Index))
605 if r.Begin+r.Length > pieceLength {
608 return r.Length == t.chunkSize || r.Begin+r.Length == pieceLength
611 func (t *Torrent) pieceChunks(piece int) (css []chunkSpec) {
612 css = make([]chunkSpec, 0, (t.pieceLength(piece)+t.chunkSize-1)/t.chunkSize)
614 for left := t.pieceLength(piece); left != 0; left -= cs.Length {
616 if cs.Length > t.chunkSize {
617 cs.Length = t.chunkSize
619 css = append(css, cs)
620 cs.Begin += cs.Length
625 func (t *Torrent) pieceNumChunks(piece int) int {
626 return int((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize)
629 func (t *Torrent) pendAllChunkSpecs(pieceIndex int) {
630 t.pieces[pieceIndex].DirtyChunks.Clear()
638 // Peer is known to support encryption.
639 SupportsEncryption bool
642 func (t *Torrent) pieceLength(piece int) (len_ pp.Integer) {
643 if piece < 0 || piece >= t.info.NumPieces() {
646 if int(piece) == t.numPieces()-1 {
647 len_ = pp.Integer(t.length % t.info.PieceLength)
650 len_ = pp.Integer(t.info.PieceLength)
655 func (t *Torrent) hashPiece(piece int) (ret metainfo.Hash) {
656 hash := pieceHash.New()
657 p := &t.pieces[piece]
658 p.waitNoPendingWrites()
659 ip := t.info.Piece(piece)
661 n, err := io.Copy(hash, io.NewSectionReader(t.pieces[piece].Storage(), 0, pl))
663 missinggo.CopyExact(&ret, hash.Sum(nil))
666 if err != io.ErrUnexpectedEOF {
667 log.Printf("unexpected error hashing piece with %T: %s", t.storage, err)
672 func (t *Torrent) haveAllPieces() bool {
676 return t.completedPieces.Len() == t.numPieces()
679 func (me *Torrent) haveAnyPieces() bool {
680 for i := range me.pieces {
681 if me.pieceComplete(i) {
688 func (t *Torrent) havePiece(index int) bool {
689 return t.haveInfo() && t.pieceComplete(index)
692 func (t *Torrent) haveChunk(r request) (ret bool) {
694 // log.Println("have chunk", r, ret)
699 if t.pieceComplete(int(r.Index)) {
702 p := &t.pieces[r.Index]
703 return !p.pendingChunk(r.chunkSpec, t.chunkSize)
706 func chunkIndex(cs chunkSpec, chunkSize pp.Integer) int {
707 return int(cs.Begin / chunkSize)
710 // TODO: This should probably be called wantPiece.
711 func (t *Torrent) wantChunk(r request) bool {
712 if !t.wantPiece(int(r.Index)) {
715 if t.pieces[r.Index].pendingChunk(r.chunkSpec, t.chunkSize) {
718 // TODO: What about pieces that were wanted, but aren't now, and aren't
719 // completed either? That used to be done here.
723 // TODO: This should be called wantPieceIndex.
724 func (t *Torrent) wantPiece(index int) bool {
728 p := &t.pieces[index]
735 if t.pieceComplete(index) {
738 if t.pendingPieces.Contains(index) {
741 return !t.forReaderOffsetPieces(func(begin, end int) bool {
742 return index < begin || index >= end
746 func (t *Torrent) forNeededPieces(f func(piece int) (more bool)) (all bool) {
747 return t.forReaderOffsetPieces(func(begin, end int) (more bool) {
748 for i := begin; begin < end; i++ {
757 func (t *Torrent) connHasWantedPieces(c *connection) bool {
758 return !c.pieceRequestOrder.IsEmpty()
761 func (t *Torrent) extentPieces(off, _len int64) (pieces []int) {
762 for i := off / int64(t.usualPieceSize()); i*int64(t.usualPieceSize()) < off+_len; i++ {
763 pieces = append(pieces, int(i))
768 func (t *Torrent) worstBadConn(cl *Client) *connection {
769 wcs := t.worstConns(cl)
772 c := heap.Pop(wcs).(*connection)
773 if c.UnwantedChunksReceived >= 6 && c.UnwantedChunksReceived > c.UsefulChunksReceived {
776 if wcs.Len() >= (socketsPerTorrent+1)/2 {
777 // Give connections 1 minute to prove themselves.
778 if time.Since(c.completedHandshake) > time.Minute {
786 type PieceStateChange struct {
791 func (t *Torrent) publishPieceChange(piece int) {
792 cur := t.pieceState(piece)
793 p := &t.pieces[piece]
794 if cur != p.PublicPieceState {
795 p.PublicPieceState = cur
796 t.pieceStateChanges.Publish(PieceStateChange{
803 func (t *Torrent) pieceNumPendingChunks(piece int) int {
804 if t.pieceComplete(piece) {
807 return t.pieceNumChunks(piece) - t.pieces[piece].numDirtyChunks()
810 func (t *Torrent) pieceAllDirty(piece int) bool {
811 return t.pieces[piece].DirtyChunks.Len() == t.pieceNumChunks(piece)
814 func (t *Torrent) forUrgentPieces(f func(piece int) (again bool)) (all bool) {
815 return t.forReaderOffsetPieces(func(begin, end int) (again bool) {
825 func (t *Torrent) readersChanged() {
826 t.updatePiecePriorities()
829 func (t *Torrent) maybeNewConns() {
830 // Tickle the accept routine.
831 t.cl.event.Broadcast()
835 func (t *Torrent) piecePriorityChanged(piece int) {
836 for _, c := range t.conns {
837 c.updatePiecePriority(piece)
840 t.publishPieceChange(piece)
843 func (t *Torrent) updatePiecePriority(piece int) bool {
844 p := &t.pieces[piece]
845 newPrio := t.piecePriorityUncached(piece)
846 if newPrio == p.priority {
853 // Update all piece priorities in one hit. This function should have the same
854 // output as updatePiecePriority, but across all pieces.
855 func (t *Torrent) updatePiecePriorities() {
856 newPrios := make([]piecePriority, t.numPieces())
857 t.pendingPieces.IterTyped(func(piece int) (more bool) {
858 newPrios[piece] = PiecePriorityNormal
861 t.forReaderOffsetPieces(func(begin, end int) (next bool) {
863 newPrios[begin].Raise(PiecePriorityNow)
865 for i := begin + 1; i < end; i++ {
866 newPrios[i].Raise(PiecePriorityReadahead)
870 t.completedPieces.IterTyped(func(piece int) (more bool) {
871 newPrios[piece] = PiecePriorityNone
874 for i, prio := range newPrios {
875 if prio != t.pieces[i].priority {
876 t.pieces[i].priority = prio
877 t.piecePriorityChanged(i)
882 func (t *Torrent) byteRegionPieces(off, size int64) (begin, end int) {
893 begin = int(off / t.info.PieceLength)
894 end = int((off + size + t.info.PieceLength - 1) / t.info.PieceLength)
895 if end > t.info.NumPieces() {
896 end = t.info.NumPieces()
901 // Returns true if all iterations complete without breaking.
902 func (t *Torrent) forReaderOffsetPieces(f func(begin, end int) (more bool)) (all bool) {
903 // There's an oppurtunity here to build a map of beginning pieces, and a
904 // bitmap of the rest. I wonder if it's worth the allocation overhead.
905 for r := range t.readers {
907 pos, readahead := r.pos, r.readahead
912 begin, end := t.byteRegionPieces(pos, readahead)
923 func (t *Torrent) piecePriority(piece int) piecePriority {
925 return PiecePriorityNone
927 return t.pieces[piece].priority
930 func (t *Torrent) piecePriorityUncached(piece int) (ret piecePriority) {
931 ret = PiecePriorityNone
932 if t.pieceComplete(piece) {
935 if t.pendingPieces.Contains(piece) {
936 ret = PiecePriorityNormal
938 raiseRet := ret.Raise
939 t.forReaderOffsetPieces(func(begin, end int) (again bool) {
941 raiseRet(PiecePriorityNow)
943 if begin <= piece && piece < end {
944 raiseRet(PiecePriorityReadahead)
951 func (t *Torrent) pendPiece(piece int) {
952 if t.pendingPieces.Contains(piece) {
955 if t.havePiece(piece) {
958 t.pendingPieces.Add(piece)
959 if !t.updatePiecePriority(piece) {
962 t.piecePriorityChanged(piece)
965 func (t *Torrent) getCompletedPieces() (ret bitmap.Bitmap) {
966 return t.completedPieces.Copy()
969 func (t *Torrent) unpendPieces(unpend *bitmap.Bitmap) {
970 t.pendingPieces.Sub(unpend)
971 t.updatePiecePriorities()
974 func (t *Torrent) pendPieceRange(begin, end int) {
975 for i := begin; i < end; i++ {
980 func (t *Torrent) unpendPieceRange(begin, end int) {
982 bm.AddRange(begin, end)
986 func (t *Torrent) connRequestPiecePendingChunks(c *connection, piece int) (more bool) {
987 if !c.PeerHasPiece(piece) {
990 chunkIndices := t.pieces[piece].undirtiedChunkIndices().ToSortedSlice()
991 return itertools.ForPerm(len(chunkIndices), func(i int) bool {
992 req := request{pp.Integer(piece), t.chunkIndexSpec(chunkIndices[i], piece)}
993 return c.Request(req)
997 func (t *Torrent) pendRequest(req request) {
998 ci := chunkIndex(req.chunkSpec, t.chunkSize)
999 t.pieces[req.Index].pendChunkIndex(ci)
1002 func (t *Torrent) pieceChanged(piece int) {
1003 t.cl.pieceChanged(t, piece)
1006 func (t *Torrent) openNewConns() {
1007 t.cl.openNewConns(t)
1010 func (t *Torrent) getConnPieceInclination() []int {
1011 _ret := t.connPieceInclinationPool.Get()
1013 pieceInclinationsNew.Add(1)
1014 return rand.Perm(t.numPieces())
1016 pieceInclinationsReused.Add(1)
1020 func (t *Torrent) putPieceInclination(pi []int) {
1021 t.connPieceInclinationPool.Put(pi)
1022 pieceInclinationsPut.Add(1)
1025 func (t *Torrent) updatePieceCompletion(piece int) {
1026 pcu := t.pieceCompleteUncached(piece)
1027 changed := t.completedPieces.Get(piece) != pcu
1028 t.completedPieces.Set(piece, pcu)
1030 t.pieceChanged(piece)
1034 // Non-blocking read. Client lock is not required.
1035 func (t *Torrent) readAt(b []byte, off int64) (n int, err error) {
1036 p := &t.pieces[off/t.info.PieceLength]
1037 p.waitNoPendingWrites()
1038 return p.Storage().ReadAt(b, off-p.Info().Offset())
1041 func (t *Torrent) updateAllPieceCompletions() {
1042 for i := range iter.N(t.numPieces()) {
1043 t.updatePieceCompletion(i)