]> Sergey Matveev's repositories - btrtrc.git/blob - torrent.go
Publicly expose Torrent.GotInfo
[btrtrc.git] / torrent.go
1 package torrent
2
3 import (
4         "container/heap"
5         "fmt"
6         "io"
7         "log"
8         "net"
9         "sort"
10         "sync"
11         "time"
12
13         "github.com/bradfitz/iter"
14
15         "github.com/anacrolix/torrent/bencode"
16         "github.com/anacrolix/torrent/data"
17         "github.com/anacrolix/torrent/metainfo"
18         pp "github.com/anacrolix/torrent/peer_protocol"
19         "github.com/anacrolix/torrent/tracker"
20         "github.com/anacrolix/torrent/util"
21 )
22
23 func (t *torrent) pieceNumPendingBytes(index int) (count pp.Integer) {
24         if t.pieceComplete(index) {
25                 return 0
26         }
27         piece := t.Pieces[index]
28         if !piece.EverHashed {
29                 return t.pieceLength(index)
30         }
31         pendingChunks := t.Pieces[index].PendingChunkSpecs
32         count = pp.Integer(len(pendingChunks)) * chunkSize
33         _lastChunkSpec := lastChunkSpec(t.pieceLength(index))
34         if _lastChunkSpec.Length != chunkSize {
35                 if _, ok := pendingChunks[_lastChunkSpec]; ok {
36                         count += _lastChunkSpec.Length - chunkSize
37                 }
38         }
39         return
40 }
41
42 type peersKey struct {
43         IPBytes string
44         Port    int
45 }
46
47 // Data maintains per-piece persistent state.
48 type StatefulData interface {
49         data.Data
50         // We believe the piece data will pass a hash check.
51         PieceCompleted(index int) error
52         // Returns true if the piece is complete.
53         PieceComplete(index int) bool
54 }
55
56 // Is not aware of Client. Maintains state of torrent for with-in a Client.
57 type torrent struct {
58         stateMu sync.Mutex
59         closing chan struct{}
60
61         // Closed when no more network activity is desired. This includes
62         // announcing, and communicating with peers.
63         ceasingNetworking chan struct{}
64
65         InfoHash InfoHash
66         Pieces   []*piece
67         // Chunks that are wanted before all others. This is for
68         // responsive/streaming readers that want to unblock ASAP.
69         urgent map[request]struct{}
70         // Total length of the torrent in bytes. Stored because it's not O(1) to
71         // get this from the info dict.
72         length int64
73
74         data StatefulData
75
76         // The info dict. Nil if we don't have it (yet).
77         Info *metainfo.Info
78         // Active peer connections, running message stream loops.
79         Conns []*connection
80         // Set of addrs to which we're attempting to connect. Connections are
81         // half-open until all handshakes are completed.
82         HalfOpen map[string]struct{}
83
84         // Reserve of peers to connect to. A peer can be both here and in the
85         // active connections if were told about the peer after connecting with
86         // them. That encourages us to reconnect to peers that are well known.
87         Peers     map[peersKey]Peer
88         wantPeers sync.Cond
89
90         // BEP 12 Multitracker Metadata Extension. The tracker.Client instances
91         // mirror their respective URLs from the announce-list metainfo key.
92         Trackers [][]tracker.Client
93         // Name used if the info name isn't available.
94         DisplayName string
95         // The bencoded bytes of the info dict.
96         MetaData []byte
97         // Each element corresponds to the 16KiB metadata pieces. If true, we have
98         // received that piece.
99         metadataHave []bool
100
101         // Closed when .Info is set.
102         gotMetainfo chan struct{}
103
104         pruneTimer *time.Timer
105 }
106
107 func (t *torrent) pieceComplete(piece int) bool {
108         // TODO: This is called when setting metadata, and before storage is
109         // assigned, which doesn't seem right.
110         return t.data != nil && t.data.PieceComplete(piece)
111 }
112
113 func (t *torrent) numConnsUnchoked() (num int) {
114         for _, c := range t.Conns {
115                 if !c.PeerChoked {
116                         num++
117                 }
118         }
119         return
120 }
121
122 // There's a connection to that address already.
123 func (t *torrent) addrActive(addr string) bool {
124         if _, ok := t.HalfOpen[addr]; ok {
125                 return true
126         }
127         for _, c := range t.Conns {
128                 if c.remoteAddr().String() == addr {
129                         return true
130                 }
131         }
132         return false
133 }
134
135 func (t *torrent) worstConnsHeap() (wcs *worstConns) {
136         wcs = &worstConns{
137                 c: append([]*connection{}, t.Conns...),
138                 t: t,
139         }
140         heap.Init(wcs)
141         return
142 }
143
144 func (t *torrent) ceaseNetworking() {
145         t.stateMu.Lock()
146         defer t.stateMu.Unlock()
147         select {
148         case <-t.ceasingNetworking:
149                 return
150         default:
151         }
152         close(t.ceasingNetworking)
153         for _, c := range t.Conns {
154                 c.Close()
155         }
156         if t.pruneTimer != nil {
157                 t.pruneTimer.Stop()
158         }
159 }
160
161 func (t *torrent) addPeer(p Peer) {
162         t.Peers[peersKey{string(p.IP), p.Port}] = p
163 }
164
165 func (t *torrent) invalidateMetadata() {
166         t.MetaData = nil
167         t.metadataHave = nil
168         t.Info = nil
169 }
170
171 func (t *torrent) saveMetadataPiece(index int, data []byte) {
172         if t.haveInfo() {
173                 return
174         }
175         if index >= len(t.metadataHave) {
176                 log.Printf("%s: ignoring metadata piece %d", t, index)
177                 return
178         }
179         copy(t.MetaData[(1<<14)*index:], data)
180         t.metadataHave[index] = true
181 }
182
183 func (t *torrent) metadataPieceCount() int {
184         return (len(t.MetaData) + (1 << 14) - 1) / (1 << 14)
185 }
186
187 func (t *torrent) haveMetadataPiece(piece int) bool {
188         if t.haveInfo() {
189                 return (1<<14)*piece < len(t.MetaData)
190         } else {
191                 return piece < len(t.metadataHave) && t.metadataHave[piece]
192         }
193 }
194
195 func (t *torrent) metadataSizeKnown() bool {
196         return t.MetaData != nil
197 }
198
199 func (t *torrent) metadataSize() int {
200         return len(t.MetaData)
201 }
202
203 func infoPieceHashes(info *metainfo.Info) (ret []string) {
204         for i := 0; i < len(info.Pieces); i += 20 {
205                 ret = append(ret, string(info.Pieces[i:i+20]))
206         }
207         return
208 }
209
210 // Called when metadata for a torrent becomes available.
211 func (t *torrent) setMetadata(md *metainfo.Info, infoBytes []byte, eventLocker sync.Locker) (err error) {
212         t.Info = md
213         t.length = 0
214         for _, f := range t.Info.UpvertedFiles() {
215                 t.length += f.Length
216         }
217         t.MetaData = infoBytes
218         t.metadataHave = nil
219         for _, hash := range infoPieceHashes(md) {
220                 piece := &piece{}
221                 piece.Event.L = eventLocker
222                 util.CopyExact(piece.Hash[:], hash)
223                 t.Pieces = append(t.Pieces, piece)
224         }
225         for _, conn := range t.Conns {
226                 t.initRequestOrdering(conn)
227                 if err := conn.setNumPieces(t.numPieces()); err != nil {
228                         log.Printf("closing connection: %s", err)
229                         conn.Close()
230                 }
231         }
232         return
233 }
234
235 func (t *torrent) setStorage(td data.Data) (err error) {
236         if c, ok := t.data.(io.Closer); ok {
237                 c.Close()
238         }
239         if sd, ok := td.(StatefulData); ok {
240                 t.data = sd
241         } else {
242                 t.data = &statelessDataWrapper{td, make([]bool, t.Info.NumPieces())}
243         }
244         return
245 }
246
247 func (t *torrent) haveAllMetadataPieces() bool {
248         if t.haveInfo() {
249                 return true
250         }
251         if t.metadataHave == nil {
252                 return false
253         }
254         for _, have := range t.metadataHave {
255                 if !have {
256                         return false
257                 }
258         }
259         return true
260 }
261
262 func (t *torrent) setMetadataSize(bytes int64, cl *Client) {
263         if t.haveInfo() {
264                 // We already know the correct metadata size.
265                 return
266         }
267         if bytes <= 0 || bytes > 10000000 { // 10MB, pulled from my ass.
268                 log.Printf("received bad metadata size: %d", bytes)
269                 return
270         }
271         if t.MetaData != nil && len(t.MetaData) == int(bytes) {
272                 return
273         }
274         t.MetaData = make([]byte, bytes)
275         t.metadataHave = make([]bool, (bytes+(1<<14)-1)/(1<<14))
276         for _, c := range t.Conns {
277                 cl.requestPendingMetadata(t, c)
278         }
279
280 }
281
282 // The current working name for the torrent. Either the name in the info dict,
283 // or a display name given such as by the dn value in a magnet link, or "".
284 func (t *torrent) Name() string {
285         if t.haveInfo() {
286                 return t.Info.Name
287         }
288         if t.DisplayName != "" {
289                 return t.DisplayName
290         }
291         return ""
292 }
293
294 func (t *torrent) pieceStatusChar(index int) byte {
295         p := t.Pieces[index]
296         switch {
297         case t.pieceComplete(index):
298                 return 'C'
299         case p.QueuedForHash:
300                 return 'Q'
301         case p.Hashing:
302                 return 'H'
303         case !p.EverHashed:
304                 return '?'
305         case t.piecePartiallyDownloaded(index):
306                 switch p.Priority {
307                 case piecePriorityNone:
308                         return 'F' // Forgotten
309                 default:
310                         return 'P'
311                 }
312         default:
313                 switch p.Priority {
314                 case piecePriorityNone:
315                         return 'z'
316                 case piecePriorityNow:
317                         return '!'
318                 case piecePriorityReadahead:
319                         return 'R'
320                 case piecePriorityNext:
321                         return 'N'
322                 default:
323                         return '.'
324                 }
325         }
326 }
327
328 func (t *torrent) metadataPieceSize(piece int) int {
329         return metadataPieceSize(len(t.MetaData), piece)
330 }
331
332 func (t *torrent) newMetadataExtensionMessage(c *connection, msgType int, piece int, data []byte) pp.Message {
333         d := map[string]int{
334                 "msg_type": msgType,
335                 "piece":    piece,
336         }
337         if data != nil {
338                 d["total_size"] = len(t.MetaData)
339         }
340         p, err := bencode.Marshal(d)
341         if err != nil {
342                 panic(err)
343         }
344         return pp.Message{
345                 Type:            pp.Extended,
346                 ExtendedID:      byte(c.PeerExtensionIDs["ut_metadata"]),
347                 ExtendedPayload: append(p, data...),
348         }
349 }
350
351 type PieceStatusCharSequence struct {
352         Char  byte // The state of this sequence of pieces.
353         Count int  // How many consecutive pieces have this state.
354 }
355
356 // Returns the state of pieces of the torrent. They are grouped into runs of
357 // same state. The sum of the Counts of the sequences is the number of pieces
358 // in the torrent. See the function torrent.pieceStatusChar for the possible
359 // states.
360 func (t *torrent) PieceStatusCharSequences() []PieceStatusCharSequence {
361         t.stateMu.Lock()
362         defer t.stateMu.Unlock()
363         return t.pieceStatusCharSequences()
364 }
365
366 // Returns the length of sequences of identical piece status chars.
367 func (t *torrent) pieceStatusCharSequences() (ret []PieceStatusCharSequence) {
368         var (
369                 char  byte
370                 count int
371         )
372         writeSequence := func() {
373                 ret = append(ret, PieceStatusCharSequence{char, count})
374         }
375         if len(t.Pieces) != 0 {
376                 char = t.pieceStatusChar(0)
377         }
378         for index := range t.Pieces {
379                 char1 := t.pieceStatusChar(index)
380                 if char1 == char {
381                         count++
382                 } else {
383                         writeSequence()
384                         char = char1
385                         count = 1
386                 }
387         }
388         if count != 0 {
389                 writeSequence()
390         }
391         return
392 }
393
394 func (t *torrent) writeStatus(w io.Writer) {
395         fmt.Fprintf(w, "Infohash: %x\n", t.InfoHash)
396         fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
397         fmt.Fprintf(w, "Metadata have: ")
398         for _, h := range t.metadataHave {
399                 fmt.Fprintf(w, "%c", func() rune {
400                         if h {
401                                 return 'H'
402                         } else {
403                                 return '.'
404                         }
405                 }())
406         }
407         fmt.Fprintln(w)
408         fmt.Fprintf(w, "Piece length: %s\n", func() string {
409                 if t.haveInfo() {
410                         return fmt.Sprint(t.usualPieceSize())
411                 } else {
412                         return "?"
413                 }
414         }())
415         if t.haveInfo() {
416                 fmt.Fprint(w, "Pieces: ")
417                 for _, seq := range t.pieceStatusCharSequences() {
418                         fmt.Fprintf(w, "%d%c ", seq.Count, seq.Char)
419                 }
420                 fmt.Fprintln(w)
421         }
422         fmt.Fprintf(w, "Urgent:")
423         for req := range t.urgent {
424                 fmt.Fprintf(w, " %s", req)
425         }
426         fmt.Fprintln(w)
427         fmt.Fprintf(w, "Trackers: ")
428         for _, tier := range t.Trackers {
429                 for _, tr := range tier {
430                         fmt.Fprintf(w, "%q ", tr.String())
431                 }
432         }
433         fmt.Fprintf(w, "\n")
434         fmt.Fprintf(w, "Pending peers: %d\n", len(t.Peers))
435         fmt.Fprintf(w, "Half open: %d\n", len(t.HalfOpen))
436         fmt.Fprintf(w, "Active peers: %d\n", len(t.Conns))
437         sort.Sort(&worstConns{
438                 c: t.Conns,
439                 t: t,
440         })
441         for _, c := range t.Conns {
442                 c.WriteStatus(w, t)
443         }
444 }
445
446 func (t *torrent) String() string {
447         s := t.Name()
448         if s == "" {
449                 s = fmt.Sprintf("%x", t.InfoHash)
450         }
451         return s
452 }
453
454 func (t *torrent) haveInfo() bool {
455         return t.Info != nil
456 }
457
458 // TODO: Include URIs that weren't converted to tracker clients.
459 func (t *torrent) announceList() (al [][]string) {
460         for _, tier := range t.Trackers {
461                 var l []string
462                 for _, tr := range tier {
463                         l = append(l, tr.URL())
464                 }
465                 al = append(al, l)
466         }
467         return
468 }
469
470 // Returns a run-time generated MetaInfo that includes the info bytes and
471 // announce-list as currently known to the client.
472 func (t *torrent) MetaInfo() *metainfo.MetaInfo {
473         if t.MetaData == nil {
474                 panic("info bytes not set")
475         }
476         return &metainfo.MetaInfo{
477                 Info: metainfo.InfoEx{
478                         Info:  *t.Info,
479                         Bytes: t.MetaData,
480                 },
481                 CreationDate: time.Now().Unix(),
482                 Comment:      "dynamic metainfo from client",
483                 CreatedBy:    "go.torrent",
484                 AnnounceList: t.announceList(),
485         }
486 }
487
488 func (t *torrent) bytesLeft() (left int64) {
489         if !t.haveInfo() {
490                 return -1
491         }
492         for i := 0; i < t.numPieces(); i++ {
493                 left += int64(t.pieceNumPendingBytes(i))
494         }
495         return
496 }
497
498 func (t *torrent) piecePartiallyDownloaded(index int) bool {
499         return t.pieceNumPendingBytes(index) != t.pieceLength(index)
500 }
501
502 func numChunksForPiece(chunkSize int, pieceSize int) int {
503         return (pieceSize + chunkSize - 1) / chunkSize
504 }
505
506 func (t *torrent) usualPieceSize() int {
507         return int(t.Info.PieceLength)
508 }
509
510 func (t *torrent) lastPieceSize() int {
511         return int(t.pieceLength(t.numPieces() - 1))
512 }
513
514 func (t *torrent) numPieces() int {
515         return t.Info.NumPieces()
516 }
517
518 func (t *torrent) numPiecesCompleted() (num int) {
519         for i := range iter.N(t.Info.NumPieces()) {
520                 if t.pieceComplete(i) {
521                         num++
522                 }
523         }
524         return
525 }
526
527 func (t *torrent) Length() int64 {
528         return t.length
529 }
530
531 func (t *torrent) isClosed() bool {
532         select {
533         case <-t.closing:
534                 return true
535         default:
536                 return false
537         }
538 }
539
540 func (t *torrent) close() (err error) {
541         if t.isClosed() {
542                 return
543         }
544         t.ceaseNetworking()
545         close(t.closing)
546         if c, ok := t.data.(io.Closer); ok {
547                 c.Close()
548         }
549         for _, conn := range t.Conns {
550                 conn.Close()
551         }
552         return
553 }
554
555 func (t *torrent) requestOffset(r request) int64 {
556         return torrentRequestOffset(t.Length(), int64(t.usualPieceSize()), r)
557 }
558
559 // Return the request that would include the given offset into the torrent
560 // data. Returns !ok if there is no such request.
561 func (t *torrent) offsetRequest(off int64) (req request, ok bool) {
562         return torrentOffsetRequest(t.Length(), t.Info.PieceLength, chunkSize, off)
563 }
564
565 func (t *torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
566         _, err = t.data.WriteAt(data, int64(piece)*t.Info.PieceLength+begin)
567         return
568 }
569
570 func (t *torrent) bitfield() (bf []bool) {
571         for _, p := range t.Pieces {
572                 // TODO: Check this logic.
573                 bf = append(bf, p.EverHashed && len(p.PendingChunkSpecs) == 0)
574         }
575         return
576 }
577
578 func (t *torrent) pieceChunks(piece int) (css []chunkSpec) {
579         css = make([]chunkSpec, 0, (t.pieceLength(piece)+chunkSize-1)/chunkSize)
580         var cs chunkSpec
581         for left := t.pieceLength(piece); left != 0; left -= cs.Length {
582                 cs.Length = left
583                 if cs.Length > chunkSize {
584                         cs.Length = chunkSize
585                 }
586                 css = append(css, cs)
587                 cs.Begin += cs.Length
588         }
589         return
590 }
591
592 func (t *torrent) pendAllChunkSpecs(index int) {
593         piece := t.Pieces[index]
594         if piece.PendingChunkSpecs == nil {
595                 piece.PendingChunkSpecs = make(
596                         map[chunkSpec]struct{},
597                         (t.pieceLength(index)+chunkSize-1)/chunkSize)
598         }
599         pcss := piece.PendingChunkSpecs
600         for _, cs := range t.pieceChunks(int(index)) {
601                 pcss[cs] = struct{}{}
602         }
603         return
604 }
605
606 type Peer struct {
607         Id     [20]byte
608         IP     net.IP
609         Port   int
610         Source peerSource
611         // Peer is known to support encryption.
612         SupportsEncryption bool
613 }
614
615 func (t *torrent) pieceLength(piece int) (len_ pp.Integer) {
616         if int(piece) == t.numPieces()-1 {
617                 len_ = pp.Integer(t.Length() % t.Info.PieceLength)
618         }
619         if len_ == 0 {
620                 len_ = pp.Integer(t.Info.PieceLength)
621         }
622         return
623 }
624
625 func (t *torrent) hashPiece(piece pp.Integer) (ps pieceSum) {
626         hash := pieceHash.New()
627         t.data.WriteSectionTo(hash, int64(piece)*t.Info.PieceLength, t.Info.PieceLength)
628         util.CopyExact(ps[:], hash.Sum(nil))
629         return
630 }
631
632 func (t *torrent) haveAllPieces() bool {
633         if !t.haveInfo() {
634                 return false
635         }
636         for i := range t.Pieces {
637                 if !t.pieceComplete(i) {
638                         return false
639                 }
640         }
641         return true
642 }
643
644 func (me *torrent) haveAnyPieces() bool {
645         for i := range me.Pieces {
646                 if me.pieceComplete(i) {
647                         return true
648                 }
649         }
650         return false
651 }
652
653 func (t *torrent) havePiece(index int) bool {
654         return t.haveInfo() && t.pieceComplete(index)
655 }
656
657 func (t *torrent) haveChunk(r request) bool {
658         if !t.haveInfo() {
659                 return false
660         }
661         piece := t.Pieces[r.Index]
662         _, ok := piece.PendingChunkSpecs[r.chunkSpec]
663         // log.Println("have chunk", r, !ok)
664         return !ok
665 }
666
667 func (t *torrent) wantChunk(r request) bool {
668         if !t.wantPiece(int(r.Index)) {
669                 return false
670         }
671         _, ok := t.Pieces[r.Index].PendingChunkSpecs[r.chunkSpec]
672         if ok {
673                 return true
674         }
675         _, ok = t.urgent[r]
676         return ok
677 }
678
679 func (t *torrent) urgentChunkInPiece(piece int) bool {
680         for req := range t.urgent {
681                 if int(req.Index) == piece {
682                         return true
683                 }
684         }
685         return false
686 }
687
688 func (t *torrent) wantPiece(index int) bool {
689         if !t.haveInfo() {
690                 return false
691         }
692         p := t.Pieces[index]
693         if p.QueuedForHash {
694                 return false
695         }
696         if p.Hashing {
697                 return false
698         }
699         if p.Priority == piecePriorityNone {
700                 if !t.urgentChunkInPiece(index) {
701                         return false
702                 }
703         }
704         // Put piece complete check last, since it's the slowest as it can involve
705         // calling out into external data stores.
706         return !t.pieceComplete(index)
707 }
708
709 func (t *torrent) connHasWantedPieces(c *connection) bool {
710         return c.pieceRequestOrder != nil && c.pieceRequestOrder.First() != nil
711 }
712
713 func (t *torrent) extentPieces(off, _len int64) (pieces []int) {
714         for i := off / int64(t.usualPieceSize()); i*int64(t.usualPieceSize()) < off+_len; i++ {
715                 pieces = append(pieces, int(i))
716         }
717         return
718 }