13 "bitbucket.org/anacrolix/go.torrent/data/file"
15 pp "bitbucket.org/anacrolix/go.torrent/peer_protocol"
16 "bitbucket.org/anacrolix/go.torrent/tracker"
17 "bitbucket.org/anacrolix/go.torrent/util"
18 "github.com/anacrolix/libtorgo/bencode"
19 "github.com/anacrolix/libtorgo/metainfo"
22 func (t *torrent) PieceNumPendingBytes(index pp.Integer) (count pp.Integer) {
23 piece := t.Pieces[index]
24 if !piece.EverHashed {
25 return t.PieceLength(index)
27 pendingChunks := t.Pieces[index].PendingChunkSpecs
28 count = pp.Integer(len(pendingChunks)) * chunkSize
29 _lastChunkSpec := lastChunkSpec(t.PieceLength(index))
30 if _lastChunkSpec.Length != chunkSize {
31 if _, ok := pendingChunks[_lastChunkSpec]; ok {
32 count += _lastChunkSpec.Length - chunkSize
38 type peersKey struct {
43 type torrentData interface {
44 ReadAt(p []byte, off int64) (n int, err error)
46 WriteAt(p []byte, off int64) (n int, err error)
47 WriteSectionTo(w io.Writer, off, n int64) (written int64, err error)
50 // Is not aware of Client.
55 // Closed when no more network activity is desired. This includes
56 // announcing, and communicating with peers.
57 ceasingNetworking chan struct{}
63 // Prevent mutations to Data memory maps while in use as they're not safe.
68 // Active peer connections.
70 // Set of addrs to which we're attempting to connect.
71 HalfOpen map[string]struct{}
73 // Reserve of peers to connect to. A peer can be both here and in the
74 // active connections if were told about the peer after connecting with
75 // them. That encourages us to reconnect to peers that are well known.
76 Peers map[peersKey]Peer
79 // BEP 12 Multitracker Metadata Extension. The tracker.Client instances
80 // mirror their respective URLs from the announce-list key.
81 Trackers [][]tracker.Client
86 gotMetainfo chan struct{}
87 GotMetainfo <-chan struct{}
90 func (t *torrent) numConnsUnchoked() (num int) {
91 for _, c := range t.Conns {
99 func (t *torrent) addrActive(addr string) bool {
100 if _, ok := t.HalfOpen[addr]; ok {
103 for _, c := range t.Conns {
104 if c.Socket.RemoteAddr().String() == addr {
111 func (t *torrent) worstConnsHeap() (wcs *worstConns) {
113 c: append([]*connection{}, t.Conns...),
120 func (t *torrent) ceaseNetworking() {
122 defer t.stateMu.Unlock()
124 case <-t.ceasingNetworking:
128 close(t.ceasingNetworking)
129 for _, c := range t.Conns {
134 func (t *torrent) AddPeers(pp []Peer) {
135 for _, p := range pp {
136 t.Peers[peersKey{string(p.IP), p.Port}] = p
140 func (t *torrent) InvalidateMetadata() {
146 func (t *torrent) SaveMetadataPiece(index int, data []byte) {
150 if index >= len(t.metadataHave) {
151 log.Printf("%s: ignoring metadata piece %d", t, index)
154 copy(t.MetaData[(1<<14)*index:], data)
155 t.metadataHave[index] = true
158 func (t *torrent) MetadataPieceCount() int {
159 return (len(t.MetaData) + (1 << 14) - 1) / (1 << 14)
162 func (t *torrent) HaveMetadataPiece(piece int) bool {
164 return (1<<14)*piece < len(t.MetaData)
166 return piece < len(t.metadataHave) && t.metadataHave[piece]
170 func (t *torrent) metadataSizeKnown() bool {
171 return t.MetaData != nil
174 func (t *torrent) metadataSize() int {
175 return len(t.MetaData)
178 func infoPieceHashes(info *metainfo.Info) (ret []string) {
179 for i := 0; i < len(info.Pieces); i += 20 {
180 ret = append(ret, string(info.Pieces[i:i+20]))
185 // Called when metadata for a torrent becomes available.
186 func (t *torrent) setMetadata(md metainfo.Info, dataDir string, infoBytes []byte, eventLocker sync.Locker) (err error) {
187 t.Info = newMetaInfo(&md)
189 for _, f := range t.Info.UpvertedFiles() {
192 t.MetaData = infoBytes
194 for _, hash := range infoPieceHashes(&md) {
196 piece.Event.L = eventLocker
197 util.CopyExact(piece.Hash[:], hash)
198 t.Pieces = append(t.Pieces, piece)
200 for _, conn := range t.Conns {
201 t.initRequestOrdering(conn)
202 if err := conn.setNumPieces(t.numPieces()); err != nil {
203 log.Printf("closing connection: %s", err)
207 t.Data, err = file.TorrentData(&md, dataDir)
209 err = fmt.Errorf("error mmap'ing torrent data: %s", err)
215 func (t *torrent) HaveAllMetadataPieces() bool {
219 if t.metadataHave == nil {
222 for _, have := range t.metadataHave {
230 func (t *torrent) SetMetadataSize(bytes int64) {
231 if t.MetaData != nil {
234 if bytes > 10000000 { // 10MB, pulled from my ass.
237 t.MetaData = make([]byte, bytes)
238 t.metadataHave = make([]bool, (bytes+(1<<14)-1)/(1<<14))
241 func (t *torrent) Name() string {
245 if t.DisplayName != "" {
248 return t.InfoHash.HexString()
251 func (t *torrent) pieceStatusChar(index int) byte {
256 case p.QueuedForHash:
262 case t.PiecePartiallyDownloaded(index):
264 case piecePriorityNone:
265 return 'F' // Forgotten
271 case piecePriorityNone:
273 case piecePriorityNow:
275 case piecePriorityReadahead:
277 case piecePriorityNext:
285 func (t *torrent) metadataPieceSize(piece int) int {
286 return metadataPieceSize(len(t.MetaData), piece)
289 func (t *torrent) NewMetadataExtensionMessage(c *connection, msgType int, piece int, data []byte) pp.Message {
295 d["total_size"] = len(t.MetaData)
297 p, err := bencode.Marshal(d)
303 ExtendedID: byte(c.PeerExtensionIDs["ut_metadata"]),
304 ExtendedPayload: append(p, data...),
308 type PieceStatusCharSequence struct {
313 func (t *torrent) PieceStatusCharSequences() []PieceStatusCharSequence {
315 defer t.stateMu.Unlock()
316 return t.pieceStatusCharSequences()
319 // Returns the length of sequences of identical piece status chars.
320 func (t *torrent) pieceStatusCharSequences() (ret []PieceStatusCharSequence) {
325 writeSequence := func() {
326 ret = append(ret, PieceStatusCharSequence{char, count})
328 if len(t.Pieces) != 0 {
329 char = t.pieceStatusChar(0)
331 for index := range t.Pieces {
332 char1 := t.pieceStatusChar(index)
347 func (t *torrent) WriteStatus(w io.Writer) {
348 fmt.Fprintf(w, "Infohash: %x\n", t.InfoHash)
349 fmt.Fprintf(w, "Piece length: %s\n", func() string {
351 return fmt.Sprint(t.UsualPieceSize())
357 fmt.Fprint(w, "Pieces: ")
358 for _, seq := range t.pieceStatusCharSequences() {
359 fmt.Fprintf(w, "%d%c ", seq.Count, seq.Char)
363 fmt.Fprintf(w, "Trackers: ")
364 for _, tier := range t.Trackers {
365 for _, tr := range tier {
366 fmt.Fprintf(w, "%q ", tr.String())
370 fmt.Fprintf(w, "Pending peers: %d\n", len(t.Peers))
371 fmt.Fprintf(w, "Half open: %d\n", len(t.HalfOpen))
372 fmt.Fprintf(w, "Active peers: %d\n", len(t.Conns))
373 sort.Sort(&worstConns{
377 for _, c := range t.Conns {
382 func (t *torrent) String() string {
385 s = fmt.Sprintf("%x", t.InfoHash)
390 func (t *torrent) haveInfo() bool {
394 // TODO: Include URIs that weren't converted to tracker clients.
395 func (t *torrent) AnnounceList() (al [][]string) {
396 for _, tier := range t.Trackers {
398 for _, tr := range tier {
399 l = append(l, tr.URL())
406 func (t *torrent) MetaInfo() *metainfo.MetaInfo {
407 if t.MetaData == nil {
408 panic("info bytes not set")
410 return &metainfo.MetaInfo{
411 Info: metainfo.InfoEx{
415 CreationDate: time.Now().Unix(),
416 Comment: "dynamic metainfo from client",
417 CreatedBy: "go.torrent",
418 AnnounceList: t.AnnounceList(),
422 func (t *torrent) BytesLeft() (left int64) {
426 for i := pp.Integer(0); i < pp.Integer(t.numPieces()); i++ {
427 left += int64(t.PieceNumPendingBytes(i))
432 func (t *torrent) PiecePartiallyDownloaded(index int) bool {
433 return t.PieceNumPendingBytes(pp.Integer(index)) != t.PieceLength(pp.Integer(index))
436 func NumChunksForPiece(chunkSize int, pieceSize int) int {
437 return (pieceSize + chunkSize - 1) / chunkSize
440 func (t *torrent) UsualPieceSize() int {
441 return int(t.Info.PieceLength)
444 func (t *torrent) LastPieceSize() int {
445 return int(t.PieceLength(pp.Integer(t.numPieces() - 1)))
448 func (t *torrent) numPieces() int {
449 return len(t.Info.Pieces) / 20
452 func (t *torrent) NumPiecesCompleted() (num int) {
453 for _, p := range t.Pieces {
461 func (t *torrent) Length() int64 {
465 func (t *torrent) isClosed() bool {
474 func (t *torrent) close() (err error) {
486 for _, conn := range t.Conns {
492 // Return the request that would include the given offset into the torrent data.
493 func torrentOffsetRequest(torrentLength, pieceSize, chunkSize, offset int64) (
494 r request, ok bool) {
495 if offset < 0 || offset >= torrentLength {
498 r.Index = pp.Integer(offset / pieceSize)
499 r.Begin = pp.Integer(offset % pieceSize / chunkSize * chunkSize)
500 left := torrentLength - int64(r.Index)*pieceSize - int64(r.Begin)
501 if chunkSize < left {
502 r.Length = pp.Integer(chunkSize)
504 r.Length = pp.Integer(left)
510 func torrentRequestOffset(torrentLength, pieceSize int64, r request) (off int64) {
511 off = int64(r.Index)*pieceSize + int64(r.Begin)
512 if off < 0 || off >= torrentLength {
513 panic("invalid request")
518 func (t *torrent) requestOffset(r request) int64 {
519 return torrentRequestOffset(t.Length(), int64(t.UsualPieceSize()), r)
522 // Return the request that would include the given offset into the torrent data.
523 func (t *torrent) offsetRequest(off int64) (req request, ok bool) {
524 return torrentOffsetRequest(t.Length(), t.Info.PieceLength, chunkSize, off)
527 func (t *torrent) WriteChunk(piece int, begin int64, data []byte) (err error) {
528 _, err = t.Data.WriteAt(data, int64(piece)*t.Info.PieceLength+begin)
532 func (t *torrent) bitfield() (bf []bool) {
533 for _, p := range t.Pieces {
534 bf = append(bf, p.EverHashed && len(p.PendingChunkSpecs) == 0)
539 func (t *torrent) pieceChunks(piece int) (css []chunkSpec) {
540 css = make([]chunkSpec, 0, (t.PieceLength(pp.Integer(piece))+chunkSize-1)/chunkSize)
542 for left := t.PieceLength(pp.Integer(piece)); left != 0; left -= cs.Length {
544 if cs.Length > chunkSize {
545 cs.Length = chunkSize
547 css = append(css, cs)
548 cs.Begin += cs.Length
553 func (t *torrent) pendAllChunkSpecs(index pp.Integer) {
554 piece := t.Pieces[index]
555 if piece.PendingChunkSpecs == nil {
556 piece.PendingChunkSpecs = make(
557 map[chunkSpec]struct{},
558 (t.PieceLength(index)+chunkSize-1)/chunkSize)
560 pcss := piece.PendingChunkSpecs
561 for _, cs := range t.pieceChunks(int(index)) {
562 pcss[cs] = struct{}{}
574 func (t *torrent) PieceLength(piece pp.Integer) (len_ pp.Integer) {
575 if int(piece) == t.numPieces()-1 {
576 len_ = pp.Integer(t.Length() % t.Info.PieceLength)
579 len_ = pp.Integer(t.Info.PieceLength)
584 func (t *torrent) HashPiece(piece pp.Integer) (ps pieceSum) {
585 hash := pieceHash.New()
587 t.Data.WriteSectionTo(hash, int64(piece)*t.Info.PieceLength, t.Info.PieceLength)
589 util.CopyExact(ps[:], hash.Sum(nil))
592 func (t *torrent) haveAllPieces() bool {
596 for _, piece := range t.Pieces {
597 if !piece.Complete() {
604 func (me *torrent) haveAnyPieces() bool {
605 for _, piece := range me.Pieces {
606 if piece.Complete() {
613 func (t *torrent) havePiece(index int) bool {
614 return t.haveInfo() && t.Pieces[index].Complete()
617 func (t *torrent) haveChunk(r request) bool {
618 p := t.Pieces[r.Index]
622 _, ok := p.PendingChunkSpecs[r.chunkSpec]
626 func (t *torrent) wantChunk(r request) bool {
627 if !t.wantPiece(int(r.Index)) {
630 _, ok := t.Pieces[r.Index].PendingChunkSpecs[r.chunkSpec]
634 func (t *torrent) wantPiece(index int) bool {
639 return p.EverHashed && len(p.PendingChunkSpecs) != 0 && p.Priority != piecePriorityNone
642 func (t *torrent) connHasWantedPieces(c *connection) bool {
643 for p := range t.Pieces {
644 if t.wantPiece(p) && c.PeerHasPiece(pp.Integer(p)) {
651 func (t *torrent) extentPieces(off, _len int64) (pieces []int) {
652 for i := off / int64(t.UsualPieceSize()); i*int64(t.UsualPieceSize()) < off+_len; i++ {
653 pieces = append(pieces, int(i))