17 "github.com/anacrolix/missinggo"
18 "github.com/anacrolix/missinggo/bitmap"
19 "github.com/anacrolix/missinggo/itertools"
20 "github.com/anacrolix/missinggo/perf"
21 "github.com/anacrolix/missinggo/pubsub"
22 "github.com/bradfitz/iter"
24 "github.com/anacrolix/torrent/bencode"
25 "github.com/anacrolix/torrent/metainfo"
26 pp "github.com/anacrolix/torrent/peer_protocol"
27 "github.com/anacrolix/torrent/storage"
28 "github.com/anacrolix/torrent/tracker"
31 func (t *Torrent) chunkIndexSpec(chunkIndex, piece int) chunkSpec {
32 return chunkIndexSpec(chunkIndex, t.pieceLength(piece), t.chunkSize)
35 type peersKey struct {
40 // Maintains state of torrent within a Client.
44 closed missinggo.Event
45 infoHash metainfo.Hash
47 // Values are the piece indices that changed.
48 pieceStateChanges *pubsub.PubSub
50 // Total length of the torrent in bytes. Stored because it's not O(1) to
51 // get this from the info dict.
54 // The storage to open when the info dict becomes available.
55 storageOpener storage.Client
56 // Storage for torrent data.
57 storage storage.Torrent
59 // The info dict. nil if we don't have it (yet).
61 // Active peer connections, running message stream loops.
63 // Set of addrs to which we're attempting to connect. Connections are
64 // half-open until all handshakes are completed.
65 halfOpen map[string]struct{}
67 // Reserve of peers to connect to. A peer can be both here and in the
68 // active connections if were told about the peer after connecting with
69 // them. That encourages us to reconnect to peers that are well known.
70 peers map[peersKey]Peer
73 // BEP 12 Multitracker Metadata Extension. The tracker.Client instances
74 // mirror their respective URLs from the announce-list metainfo key.
75 trackers []trackerTier
76 // Name used if the info name isn't available.
78 // The bencoded bytes of the info dict.
80 // Each element corresponds to the 16KiB metadata pieces. If true, we have
81 // received that piece.
82 metadataCompletedChunks []bool
84 // Set when .Info is obtained.
85 gotMetainfo missinggo.Event
87 readers map[*Reader]struct{}
89 pendingPieces bitmap.Bitmap
90 completedPieces bitmap.Bitmap
92 connPieceInclinationPool sync.Pool
95 func (t *Torrent) setDisplayName(dn string) {
99 func (t *Torrent) pieceComplete(piece int) bool {
100 return t.completedPieces.Get(piece)
103 func (t *Torrent) pieceCompleteUncached(piece int) bool {
104 return t.pieces[piece].Storage().GetIsComplete()
107 func (t *Torrent) numConnsUnchoked() (num int) {
108 for _, c := range t.conns {
116 // There's a connection to that address already.
117 func (t *Torrent) addrActive(addr string) bool {
118 if _, ok := t.halfOpen[addr]; ok {
121 for _, c := range t.conns {
122 if c.remoteAddr().String() == addr {
129 func (t *Torrent) worstConns(cl *Client) (wcs *worstConns) {
131 c: make([]*connection, 0, len(t.conns)),
135 for _, c := range t.conns {
136 if !c.closed.IsSet() {
137 wcs.c = append(wcs.c, c)
143 func (t *Torrent) addPeer(p Peer, cl *Client) {
145 if len(t.peers) >= torrentPeersHighWater {
148 key := peersKey{string(p.IP), p.Port}
149 if _, ok := t.peers[key]; ok {
153 peersAddedBySource.Add(string(p.Source), 1)
158 func (t *Torrent) invalidateMetadata() {
159 t.metadataBytes = nil
160 t.metadataCompletedChunks = nil
164 func (t *Torrent) saveMetadataPiece(index int, data []byte) {
168 if index >= len(t.metadataCompletedChunks) {
169 log.Printf("%s: ignoring metadata piece %d", t, index)
172 copy(t.metadataBytes[(1<<14)*index:], data)
173 t.metadataCompletedChunks[index] = true
176 func (t *Torrent) metadataPieceCount() int {
177 return (len(t.metadataBytes) + (1 << 14) - 1) / (1 << 14)
180 func (t *Torrent) haveMetadataPiece(piece int) bool {
182 return (1<<14)*piece < len(t.metadataBytes)
184 return piece < len(t.metadataCompletedChunks) && t.metadataCompletedChunks[piece]
188 func (t *Torrent) metadataSizeKnown() bool {
189 return t.metadataBytes != nil
192 func (t *Torrent) metadataSize() int {
193 return len(t.metadataBytes)
196 func infoPieceHashes(info *metainfo.Info) (ret []string) {
197 for i := 0; i < len(info.Pieces); i += 20 {
198 ret = append(ret, string(info.Pieces[i:i+20]))
203 // Called when metadata for a torrent becomes available.
204 func (t *Torrent) setInfoBytes(b []byte) error {
208 var ie *metainfo.InfoEx
209 err := bencode.Unmarshal(b, &ie)
211 return fmt.Errorf("error unmarshalling info bytes: %s", err)
213 if ie.Hash() != t.infoHash {
214 return errors.New("info bytes have wrong hash")
216 err = validateInfo(&ie.Info)
218 return fmt.Errorf("bad info: %s", err)
221 t.cl.event.Broadcast()
223 t.storage, err = t.storageOpener.OpenTorrent(t.info)
225 return fmt.Errorf("error opening torrent storage: %s", err)
228 for _, f := range t.info.UpvertedFiles() {
232 t.metadataCompletedChunks = nil
233 hashes := infoPieceHashes(&t.info.Info)
234 t.pieces = make([]piece, len(hashes))
235 for i, hash := range hashes {
236 piece := &t.pieces[i]
239 piece.noPendingWrites.L = &piece.pendingWritesMutex
240 missinggo.CopyExact(piece.Hash[:], hash)
242 for _, conn := range t.conns {
243 if err := conn.setNumPieces(t.numPieces()); err != nil {
244 log.Printf("closing connection: %s", err)
248 for i := range t.pieces {
249 t.updatePieceCompletion(i)
250 t.pieces[i].QueuedForHash = true
253 for i := range t.pieces {
260 func (t *Torrent) verifyPiece(piece int) {
261 t.cl.verifyPiece(t, piece)
264 func (t *Torrent) haveAllMetadataPieces() bool {
268 if t.metadataCompletedChunks == nil {
271 for _, have := range t.metadataCompletedChunks {
279 // TODO: Propagate errors to disconnect peer.
280 func (t *Torrent) setMetadataSize(bytes int64) (err error) {
282 // We already know the correct metadata size.
285 if bytes <= 0 || bytes > 10000000 { // 10MB, pulled from my ass.
286 return errors.New("bad size")
288 if t.metadataBytes != nil && len(t.metadataBytes) == int(bytes) {
291 t.metadataBytes = make([]byte, bytes)
292 t.metadataCompletedChunks = make([]bool, (bytes+(1<<14)-1)/(1<<14))
293 for _, c := range t.conns {
294 c.requestPendingMetadata()
299 // The current working name for the torrent. Either the name in the info dict,
300 // or a display name given such as by the dn value in a magnet link, or "".
301 func (t *Torrent) name() string {
308 func (t *Torrent) pieceState(index int) (ret PieceState) {
309 p := &t.pieces[index]
310 ret.Priority = t.piecePriority(index)
311 if t.pieceComplete(index) {
314 if p.QueuedForHash || p.Hashing {
317 if !ret.Complete && t.piecePartiallyDownloaded(index) {
323 func (t *Torrent) metadataPieceSize(piece int) int {
324 return metadataPieceSize(len(t.metadataBytes), piece)
327 func (t *Torrent) newMetadataExtensionMessage(c *connection, msgType int, piece int, data []byte) pp.Message {
333 d["total_size"] = len(t.metadataBytes)
335 p, err := bencode.Marshal(d)
341 ExtendedID: c.PeerExtensionIDs["ut_metadata"],
342 ExtendedPayload: append(p, data...),
346 func (t *Torrent) pieceStateRuns() (ret []PieceStateRun) {
347 rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
348 ret = append(ret, PieceStateRun{
349 PieceState: el.(PieceState),
353 for index := range t.pieces {
354 rle.Append(t.pieceState(index), 1)
360 // Produces a small string representing a PieceStateRun.
361 func pieceStateRunStatusChars(psr PieceStateRun) (ret string) {
362 ret = fmt.Sprintf("%d", psr.Length)
363 ret += func() string {
364 switch psr.Priority {
365 case PiecePriorityNext:
367 case PiecePriorityNormal:
369 case PiecePriorityReadahead:
371 case PiecePriorityNow:
389 func (t *Torrent) writeStatus(w io.Writer, cl *Client) {
390 fmt.Fprintf(w, "Infohash: %x\n", t.infoHash)
391 fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
393 fmt.Fprintf(w, "Metadata have: ")
394 for _, h := range t.metadataCompletedChunks {
395 fmt.Fprintf(w, "%c", func() rune {
405 fmt.Fprintf(w, "Piece length: %s\n", func() string {
407 return fmt.Sprint(t.usualPieceSize())
413 fmt.Fprintf(w, "Num Pieces: %d\n", t.numPieces())
414 fmt.Fprint(w, "Piece States:")
415 for _, psr := range t.pieceStateRuns() {
417 w.Write([]byte(pieceStateRunStatusChars(psr)))
421 fmt.Fprintf(w, "Reader Pieces:")
422 t.forReaderOffsetPieces(func(begin, end int) (again bool) {
423 fmt.Fprintf(w, " %d:%d", begin, end)
427 fmt.Fprintf(w, "Trackers: ")
428 for _, tier := range t.trackers {
429 for _, tr := range tier {
430 fmt.Fprintf(w, "%q ", tr)
434 fmt.Fprintf(w, "Pending peers: %d\n", len(t.peers))
435 fmt.Fprintf(w, "Half open: %d\n", len(t.halfOpen))
436 fmt.Fprintf(w, "Active peers: %d\n", len(t.conns))
437 sort.Sort(&worstConns{
442 for i, c := range t.conns {
443 fmt.Fprintf(w, "%2d. ", i+1)
448 func (t *Torrent) haveInfo() bool {
452 // TODO: Include URIs that weren't converted to tracker clients.
453 func (t *Torrent) announceList() (al [][]string) {
454 missinggo.CastSlice(&al, t.trackers)
458 // Returns a run-time generated MetaInfo that includes the info bytes and
459 // announce-list as currently known to the client.
460 func (t *Torrent) metainfo() *metainfo.MetaInfo {
461 if t.metadataBytes == nil {
462 panic("info bytes not set")
464 return &metainfo.MetaInfo{
466 CreationDate: time.Now().Unix(),
467 Comment: "dynamic metainfo from client",
468 CreatedBy: "go.torrent",
469 AnnounceList: t.announceList(),
473 func (t *Torrent) bytesLeft() (left int64) {
474 for i := 0; i < t.numPieces(); i++ {
475 left += int64(t.pieces[i].bytesLeft())
480 // Bytes left to give in tracker announces.
481 func (t *Torrent) bytesLeftAnnounce() uint64 {
483 return uint64(t.bytesLeft())
485 return math.MaxUint64
489 func (t *Torrent) piecePartiallyDownloaded(piece int) bool {
490 if t.pieceComplete(piece) {
493 if t.pieceAllDirty(piece) {
496 return t.pieces[piece].hasDirtyChunks()
499 func (t *Torrent) usualPieceSize() int {
500 return int(t.info.PieceLength)
503 func (t *Torrent) lastPieceSize() int {
504 return int(t.pieceLength(t.numPieces() - 1))
507 func (t *Torrent) numPieces() int {
508 return t.info.NumPieces()
511 func (t *Torrent) numPiecesCompleted() (num int) {
512 return t.completedPieces.Len()
515 func (t *Torrent) close() (err error) {
517 if c, ok := t.storage.(io.Closer); ok {
520 for _, conn := range t.conns {
523 t.pieceStateChanges.Close()
527 func (t *Torrent) requestOffset(r request) int64 {
528 return torrentRequestOffset(t.length, int64(t.usualPieceSize()), r)
531 // Return the request that would include the given offset into the torrent
532 // data. Returns !ok if there is no such request.
533 func (t *Torrent) offsetRequest(off int64) (req request, ok bool) {
534 return torrentOffsetRequest(t.length, t.info.PieceLength, int64(t.chunkSize), off)
537 func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
538 tr := perf.NewTimer()
540 n, err := t.pieces[piece].Storage().WriteAt(data, begin)
541 if err == nil && n != len(data) {
542 err = io.ErrShortWrite
545 tr.Stop("write chunk")
550 func (t *Torrent) bitfield() (bf []bool) {
551 bf = make([]bool, t.numPieces())
552 t.completedPieces.IterTyped(func(piece int) (again bool) {
559 func (t *Torrent) validOutgoingRequest(r request) bool {
560 if r.Index >= pp.Integer(t.info.NumPieces()) {
563 if r.Begin%t.chunkSize != 0 {
566 if r.Length > t.chunkSize {
569 pieceLength := t.pieceLength(int(r.Index))
570 if r.Begin+r.Length > pieceLength {
573 return r.Length == t.chunkSize || r.Begin+r.Length == pieceLength
576 func (t *Torrent) pieceChunks(piece int) (css []chunkSpec) {
577 css = make([]chunkSpec, 0, (t.pieceLength(piece)+t.chunkSize-1)/t.chunkSize)
579 for left := t.pieceLength(piece); left != 0; left -= cs.Length {
581 if cs.Length > t.chunkSize {
582 cs.Length = t.chunkSize
584 css = append(css, cs)
585 cs.Begin += cs.Length
590 func (t *Torrent) pieceNumChunks(piece int) int {
591 return int((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize)
594 func (t *Torrent) pendAllChunkSpecs(pieceIndex int) {
595 t.pieces[pieceIndex].DirtyChunks.Clear()
603 // Peer is known to support encryption.
604 SupportsEncryption bool
607 func (t *Torrent) pieceLength(piece int) (len_ pp.Integer) {
608 if piece < 0 || piece >= t.info.NumPieces() {
611 if piece == t.numPieces()-1 {
612 len_ = pp.Integer(t.length % t.info.PieceLength)
615 len_ = pp.Integer(t.info.PieceLength)
620 func (t *Torrent) hashPiece(piece int) (ret metainfo.Hash) {
621 hash := pieceHash.New()
622 p := &t.pieces[piece]
623 p.waitNoPendingWrites()
624 ip := t.info.Piece(piece)
626 n, err := io.Copy(hash, io.NewSectionReader(t.pieces[piece].Storage(), 0, pl))
628 missinggo.CopyExact(&ret, hash.Sum(nil))
631 if err != io.ErrUnexpectedEOF && !os.IsNotExist(err) {
632 log.Printf("unexpected error hashing piece with %T: %s", t.storage, err)
637 func (t *Torrent) haveAllPieces() bool {
641 return t.completedPieces.Len() == t.numPieces()
644 func (t *Torrent) haveAnyPieces() bool {
645 for i := range t.pieces {
646 if t.pieceComplete(i) {
653 func (t *Torrent) havePiece(index int) bool {
654 return t.haveInfo() && t.pieceComplete(index)
657 func (t *Torrent) haveChunk(r request) (ret bool) {
659 // log.Println("have chunk", r, ret)
664 if t.pieceComplete(int(r.Index)) {
667 p := &t.pieces[r.Index]
668 return !p.pendingChunk(r.chunkSpec, t.chunkSize)
671 func chunkIndex(cs chunkSpec, chunkSize pp.Integer) int {
672 return int(cs.Begin / chunkSize)
675 func (t *Torrent) wantPiece(r request) bool {
676 if !t.wantPieceIndex(int(r.Index)) {
679 if t.pieces[r.Index].pendingChunk(r.chunkSpec, t.chunkSize) {
682 // TODO: What about pieces that were wanted, but aren't now, and aren't
683 // completed either? That used to be done here.
687 func (t *Torrent) wantPieceIndex(index int) bool {
691 p := &t.pieces[index]
698 if t.pieceComplete(index) {
701 if t.pendingPieces.Contains(index) {
704 return !t.forReaderOffsetPieces(func(begin, end int) bool {
705 return index < begin || index >= end
709 func (t *Torrent) forNeededPieces(f func(piece int) (more bool)) (all bool) {
710 return t.forReaderOffsetPieces(func(begin, end int) (more bool) {
711 for i := begin; begin < end; i++ {
720 func (t *Torrent) connHasWantedPieces(c *connection) bool {
721 return !c.pieceRequestOrder.IsEmpty()
724 func (t *Torrent) extentPieces(off, _len int64) (pieces []int) {
725 for i := off / int64(t.usualPieceSize()); i*int64(t.usualPieceSize()) < off+_len; i++ {
726 pieces = append(pieces, int(i))
731 func (t *Torrent) worstBadConn(cl *Client) *connection {
732 wcs := t.worstConns(cl)
735 c := heap.Pop(wcs).(*connection)
736 if c.UnwantedChunksReceived >= 6 && c.UnwantedChunksReceived > c.UsefulChunksReceived {
739 if wcs.Len() >= (socketsPerTorrent+1)/2 {
740 // Give connections 1 minute to prove themselves.
741 if time.Since(c.completedHandshake) > time.Minute {
749 type PieceStateChange struct {
754 func (t *Torrent) publishPieceChange(piece int) {
755 cur := t.pieceState(piece)
756 p := &t.pieces[piece]
757 if cur != p.PublicPieceState {
758 p.PublicPieceState = cur
759 t.pieceStateChanges.Publish(PieceStateChange{
766 func (t *Torrent) pieceNumPendingChunks(piece int) int {
767 if t.pieceComplete(piece) {
770 return t.pieceNumChunks(piece) - t.pieces[piece].numDirtyChunks()
773 func (t *Torrent) pieceAllDirty(piece int) bool {
774 return t.pieces[piece].DirtyChunks.Len() == t.pieceNumChunks(piece)
777 func (t *Torrent) forUrgentPieces(f func(piece int) (again bool)) (all bool) {
778 return t.forReaderOffsetPieces(func(begin, end int) (again bool) {
788 func (t *Torrent) readersChanged() {
789 t.updatePiecePriorities()
792 func (t *Torrent) maybeNewConns() {
793 // Tickle the accept routine.
794 t.cl.event.Broadcast()
798 func (t *Torrent) piecePriorityChanged(piece int) {
799 for _, c := range t.conns {
800 c.updatePiecePriority(piece)
803 t.publishPieceChange(piece)
806 func (t *Torrent) updatePiecePriority(piece int) bool {
807 p := &t.pieces[piece]
808 newPrio := t.piecePriorityUncached(piece)
809 if newPrio == p.priority {
816 // Update all piece priorities in one hit. This function should have the same
817 // output as updatePiecePriority, but across all pieces.
818 func (t *Torrent) updatePiecePriorities() {
819 newPrios := make([]piecePriority, t.numPieces())
820 t.pendingPieces.IterTyped(func(piece int) (more bool) {
821 newPrios[piece] = PiecePriorityNormal
824 t.forReaderOffsetPieces(func(begin, end int) (next bool) {
826 newPrios[begin].Raise(PiecePriorityNow)
828 for i := begin + 1; i < end; i++ {
829 newPrios[i].Raise(PiecePriorityReadahead)
833 t.completedPieces.IterTyped(func(piece int) (more bool) {
834 newPrios[piece] = PiecePriorityNone
837 for i, prio := range newPrios {
838 if prio != t.pieces[i].priority {
839 t.pieces[i].priority = prio
840 t.piecePriorityChanged(i)
845 func (t *Torrent) byteRegionPieces(off, size int64) (begin, end int) {
856 begin = int(off / t.info.PieceLength)
857 end = int((off + size + t.info.PieceLength - 1) / t.info.PieceLength)
858 if end > t.info.NumPieces() {
859 end = t.info.NumPieces()
864 // Returns true if all iterations complete without breaking.
865 func (t *Torrent) forReaderOffsetPieces(f func(begin, end int) (more bool)) (all bool) {
866 // There's an oppurtunity here to build a map of beginning pieces, and a
867 // bitmap of the rest. I wonder if it's worth the allocation overhead.
868 for r := range t.readers {
870 pos, readahead := r.pos, r.readahead
875 begin, end := t.byteRegionPieces(pos, readahead)
886 func (t *Torrent) piecePriority(piece int) piecePriority {
888 return PiecePriorityNone
890 return t.pieces[piece].priority
893 func (t *Torrent) piecePriorityUncached(piece int) (ret piecePriority) {
894 ret = PiecePriorityNone
895 if t.pieceComplete(piece) {
898 if t.pendingPieces.Contains(piece) {
899 ret = PiecePriorityNormal
901 raiseRet := ret.Raise
902 t.forReaderOffsetPieces(func(begin, end int) (again bool) {
904 raiseRet(PiecePriorityNow)
906 if begin <= piece && piece < end {
907 raiseRet(PiecePriorityReadahead)
914 func (t *Torrent) pendPiece(piece int) {
915 if t.pendingPieces.Contains(piece) {
918 if t.havePiece(piece) {
921 t.pendingPieces.Add(piece)
922 if !t.updatePiecePriority(piece) {
925 t.piecePriorityChanged(piece)
928 func (t *Torrent) getCompletedPieces() (ret bitmap.Bitmap) {
929 return t.completedPieces.Copy()
932 func (t *Torrent) unpendPieces(unpend *bitmap.Bitmap) {
933 t.pendingPieces.Sub(unpend)
934 t.updatePiecePriorities()
937 func (t *Torrent) pendPieceRange(begin, end int) {
938 for i := begin; i < end; i++ {
943 func (t *Torrent) unpendPieceRange(begin, end int) {
945 bm.AddRange(begin, end)
949 func (t *Torrent) connRequestPiecePendingChunks(c *connection, piece int) (more bool) {
950 if !c.PeerHasPiece(piece) {
953 chunkIndices := t.pieces[piece].undirtiedChunkIndices().ToSortedSlice()
954 return itertools.ForPerm(len(chunkIndices), func(i int) bool {
955 req := request{pp.Integer(piece), t.chunkIndexSpec(chunkIndices[i], piece)}
956 return c.Request(req)
960 func (t *Torrent) pendRequest(req request) {
961 ci := chunkIndex(req.chunkSpec, t.chunkSize)
962 t.pieces[req.Index].pendChunkIndex(ci)
965 func (t *Torrent) pieceChanged(piece int) {
966 t.cl.pieceChanged(t, piece)
969 func (t *Torrent) openNewConns() {
973 func (t *Torrent) getConnPieceInclination() []int {
974 _ret := t.connPieceInclinationPool.Get()
976 pieceInclinationsNew.Add(1)
977 return rand.Perm(t.numPieces())
979 pieceInclinationsReused.Add(1)
983 func (t *Torrent) putPieceInclination(pi []int) {
984 t.connPieceInclinationPool.Put(pi)
985 pieceInclinationsPut.Add(1)
988 func (t *Torrent) updatePieceCompletion(piece int) {
989 pcu := t.pieceCompleteUncached(piece)
990 changed := t.completedPieces.Get(piece) != pcu
991 t.completedPieces.Set(piece, pcu)
993 t.pieceChanged(piece)
997 // Non-blocking read. Client lock is not required.
998 func (t *Torrent) readAt(b []byte, off int64) (n int, err error) {
999 p := &t.pieces[off/t.info.PieceLength]
1000 p.waitNoPendingWrites()
1001 return p.Storage().ReadAt(b, off-p.Info().Offset())
1004 func (t *Torrent) updateAllPieceCompletions() {
1005 for i := range iter.N(t.numPieces()) {
1006 t.updatePieceCompletion(i)
1010 func (t *Torrent) maybeMetadataCompleted() {
1015 if !t.haveAllMetadataPieces() {
1016 // Don't have enough metadata pieces.
1019 // TODO(anacrolix): If this fails, I think something harsher should be
1021 err := t.setInfoBytes(t.metadataBytes)
1023 log.Printf("error setting metadata: %s", err)
1024 t.invalidateMetadata()
1027 if t.cl.config.Debug {
1028 log.Printf("%s: got metadata from peers", t)
1032 func (t *Torrent) readerPieces() (ret bitmap.Bitmap) {
1033 t.forReaderOffsetPieces(func(begin, end int) bool {
1034 ret.AddRange(begin, end)
1040 func (t *Torrent) needData() bool {
1044 if t.pendingPieces.Len() != 0 {
1047 return !t.readerPieces().IterTyped(func(piece int) bool {
1048 return t.pieceComplete(piece)
1052 func (t *Torrent) addTrackers(announceList [][]string) {
1053 newTrackers := copyTrackers(t.trackers)
1054 for tierIndex, tier := range announceList {
1055 if tierIndex < len(newTrackers) {
1056 newTrackers[tierIndex] = mergeTier(newTrackers[tierIndex], tier)
1058 newTrackers = append(newTrackers, mergeTier(nil, tier))
1060 shuffleTier(newTrackers[tierIndex])
1062 t.trackers = newTrackers
1065 // Don't call this before the info is available.
1066 func (t *Torrent) bytesCompleted() int64 {
1070 return t.info.TotalLength() - t.bytesLeft()
1073 func (t *Torrent) SetInfoBytes(b []byte) (err error) {
1075 defer t.cl.mu.Unlock()
1079 return t.setInfoBytes(b)
1082 // Returns true if connection is removed from torrent.Conns.
1083 func (t *Torrent) deleteConnection(c *connection) bool {
1084 for i0, _c := range t.conns {
1088 i1 := len(t.conns) - 1
1090 t.conns[i0] = t.conns[i1]
1092 t.conns = t.conns[:i1]
1098 func (t *Torrent) dropConnection(c *connection) {
1099 t.cl.event.Broadcast()
1101 if t.deleteConnection(c) {
1106 // Returns true when peers are required, or false if the torrent is closing.
1107 func (t *Torrent) waitWantPeers() bool {
1109 defer t.cl.mu.Unlock()
1111 if t.closed.IsSet() {
1114 if len(t.peers) > torrentPeersLowWater {
1117 if t.needData() || t.seeding() {
1125 // Returns whether the client should make effort to seed the torrent.
1126 func (t *Torrent) seeding() bool {
1128 if cl.config.NoUpload {
1131 if !cl.config.Seed {
1140 // Announce torrent to its trackers.
1141 func (t *Torrent) announceTrackers() {
1143 req := tracker.AnnounceRequest{
1144 Event: tracker.Started,
1146 Port: uint16(cl.incomingPeerPort()),
1148 InfoHash: t.infoHash,
1150 if !t.waitWantPeers() {
1154 req.Left = t.bytesLeftAnnounce()
1155 trackers := t.trackers
1157 if t.announceTrackersFastStart(&req, trackers) {
1158 req.Event = tracker.None
1161 for t.waitWantPeers() {
1163 req.Left = t.bytesLeftAnnounce()
1164 trackers = t.trackers
1166 numTrackersTried := 0
1167 for _, tier := range trackers {
1168 for trIndex, tr := range tier {
1170 interval, err := t.announceSingleTracker(tr, &req)
1172 // Try the next tracker.
1175 // Float the successful announce to the top of the tier. If
1176 // the trackers list has been changed, we'll be modifying an
1177 // old copy so it won't matter.
1179 tier[0], tier[trIndex] = tier[trIndex], tier[0]
1182 req.Event = tracker.None
1183 // Wait the interval before attempting another announce.
1184 time.Sleep(interval)
1185 continue newAnnounce
1188 if numTrackersTried != 0 {
1189 log.Printf("%s: all trackers failed", t)
1191 // TODO: Wait until trackers are added if there are none.
1192 time.Sleep(10 * time.Second)
1196 func (t *Torrent) announceTrackersFastStart(req *tracker.AnnounceRequest, trackers []trackerTier) (atLeastOne bool) {
1197 oks := make(chan bool)
1199 for _, tier := range trackers {
1200 for _, tr := range tier {
1202 go func(tr string) {
1203 _, err := t.announceSingleTracker(tr, req)
1208 for outstanding > 0 {
1218 func (t *Torrent) announceSingleTracker(tr string, req *tracker.AnnounceRequest) (interval time.Duration, err error) {
1219 blocked, err := t.cl.trackerBlockedUnlocked(tr)
1221 err = fmt.Errorf("error determining if tracker blocked: %s", err)
1225 err = errors.New("tracker has blocked IP")
1228 resp, err := tracker.Announce(tr, req)
1233 for _, peer := range resp.Peers {
1234 peers = append(peers, Peer{
1240 interval = time.Second * time.Duration(resp.Interval)