]> Sergey Matveev's repositories - btrtrc.git/blob - torrent.go
Use rogpeppe's sortimports to fix this goimports ordering madness
[btrtrc.git] / torrent.go
1 package torrent
2
3 import (
4         "container/heap"
5         "errors"
6         "fmt"
7         "io"
8         "log"
9         "net"
10         "os"
11         "sort"
12         "sync"
13         "time"
14
15         "github.com/anacrolix/libtorgo/bencode"
16         "github.com/anacrolix/libtorgo/metainfo"
17         "github.com/bradfitz/iter"
18
19         "github.com/anacrolix/torrent/data"
20         pp "github.com/anacrolix/torrent/peer_protocol"
21         "github.com/anacrolix/torrent/tracker"
22         "github.com/anacrolix/torrent/util"
23 )
24
25 func (t *torrent) pieceNumPendingBytes(index int) (count pp.Integer) {
26         if t.pieceComplete(index) {
27                 return 0
28         }
29         piece := t.Pieces[index]
30         if !piece.EverHashed {
31                 return t.pieceLength(index)
32         }
33         pendingChunks := t.Pieces[index].PendingChunkSpecs
34         count = pp.Integer(len(pendingChunks)) * chunkSize
35         _lastChunkSpec := lastChunkSpec(t.pieceLength(index))
36         if _lastChunkSpec.Length != chunkSize {
37                 if _, ok := pendingChunks[_lastChunkSpec]; ok {
38                         count += _lastChunkSpec.Length - chunkSize
39                 }
40         }
41         return
42 }
43
44 type peersKey struct {
45         IPBytes string
46         Port    int
47 }
48
49 // Data maintains per-piece persistent state.
50 type StatefulData interface {
51         data.Data
52         // We believe the piece data will pass a hash check.
53         PieceCompleted(index int) error
54         // Returns true if the piece is complete.
55         PieceComplete(index int) bool
56 }
57
58 // Is not aware of Client. Maintains state of torrent for with-in a Client.
59 type torrent struct {
60         stateMu sync.Mutex
61         closing chan struct{}
62
63         // Closed when no more network activity is desired. This includes
64         // announcing, and communicating with peers.
65         ceasingNetworking chan struct{}
66
67         InfoHash InfoHash
68         Pieces   []*piece
69         // Total length of the torrent in bytes. Stored because it's not O(1) to
70         // get this from the info dict.
71         length int64
72
73         data StatefulData
74
75         // The info dict. Nil if we don't have it.
76         Info *metainfo.Info
77         // Active peer connections, running message stream loops.
78         Conns []*connection
79         // Set of addrs to which we're attempting to connect. Connections are
80         // half-open until all handshakes are completed.
81         HalfOpen map[string]struct{}
82
83         // Reserve of peers to connect to. A peer can be both here and in the
84         // active connections if were told about the peer after connecting with
85         // them. That encourages us to reconnect to peers that are well known.
86         Peers     map[peersKey]Peer
87         wantPeers sync.Cond
88
89         // BEP 12 Multitracker Metadata Extension. The tracker.Client instances
90         // mirror their respective URLs from the announce-list metainfo key.
91         Trackers [][]tracker.Client
92         // Name used if the info name isn't available.
93         DisplayName string
94         // The bencoded bytes of the info dict.
95         MetaData []byte
96         // Each element corresponds to the 16KiB metadata pieces. If true, we have
97         // received that piece.
98         metadataHave []bool
99
100         // Closed when .Info is set.
101         gotMetainfo chan struct{}
102         GotMetainfo <-chan struct{}
103
104         pruneTimer *time.Timer
105 }
106
107 func (t *torrent) pieceComplete(piece int) bool {
108         // TODO: This is called when setting metadata, and before storage is
109         // assigned, which doesn't seem right.
110         return t.data != nil && t.data.PieceComplete(piece)
111 }
112
113 // A file-like handle to torrent data that implements SectionOpener. Opened
114 // sections will be reused so long as Reads and ReadAt's are contiguous.
115 type handle struct {
116         rc     io.ReadCloser
117         rcOff  int64
118         curOff int64
119         so     SectionOpener
120         size   int64
121         t      Torrent
122 }
123
124 func (h *handle) Close() error {
125         if h.rc != nil {
126                 return h.rc.Close()
127         }
128         return nil
129 }
130
131 func (h *handle) ReadAt(b []byte, off int64) (n int, err error) {
132         return h.readAt(b, off)
133 }
134
135 func (h *handle) readAt(b []byte, off int64) (n int, err error) {
136         avail := h.t.prepareRead(off)
137         if int64(len(b)) > avail {
138                 b = b[:avail]
139         }
140         if int64(len(b)) > h.size-off {
141                 b = b[:h.size-off]
142         }
143         if h.rcOff != off && h.rc != nil {
144                 h.rc.Close()
145                 h.rc = nil
146         }
147         if h.rc == nil {
148                 h.rc, err = h.so.OpenSection(off, h.size-off)
149                 if err != nil {
150                         return
151                 }
152                 h.rcOff = off
153         }
154         n, err = h.rc.Read(b)
155         h.rcOff += int64(n)
156         return
157 }
158
159 func (h *handle) Read(b []byte) (n int, err error) {
160         n, err = h.readAt(b, h.curOff)
161         h.curOff = h.rcOff
162         return
163 }
164
165 func (h *handle) Seek(off int64, whence int) (newOff int64, err error) {
166         switch whence {
167         case os.SEEK_SET:
168                 h.curOff = off
169         case os.SEEK_CUR:
170                 h.curOff += off
171         case os.SEEK_END:
172                 h.curOff = h.size + off
173         default:
174                 err = errors.New("bad whence")
175         }
176         newOff = h.curOff
177         return
178 }
179
180 // Implements Handle on top of an io.SectionReader.
181 type sectionReaderHandle struct {
182         *io.SectionReader
183 }
184
185 func (sectionReaderHandle) Close() error { return nil }
186
187 func (T Torrent) NewReadHandle() Handle {
188         if so, ok := T.data.(SectionOpener); ok {
189                 return &handle{
190                         so:   so,
191                         size: T.Length(),
192                         t:    T,
193                 }
194         }
195         return sectionReaderHandle{io.NewSectionReader(T, 0, T.Length())}
196 }
197
198 func (t *torrent) numConnsUnchoked() (num int) {
199         for _, c := range t.Conns {
200                 if !c.PeerChoked {
201                         num++
202                 }
203         }
204         return
205 }
206
207 // There's a connection to that address already.
208 func (t *torrent) addrActive(addr string) bool {
209         if _, ok := t.HalfOpen[addr]; ok {
210                 return true
211         }
212         for _, c := range t.Conns {
213                 if c.remoteAddr().String() == addr {
214                         return true
215                 }
216         }
217         return false
218 }
219
220 func (t *torrent) worstConnsHeap() (wcs *worstConns) {
221         wcs = &worstConns{
222                 c: append([]*connection{}, t.Conns...),
223                 t: t,
224         }
225         heap.Init(wcs)
226         return
227 }
228
229 func (t *torrent) ceaseNetworking() {
230         t.stateMu.Lock()
231         defer t.stateMu.Unlock()
232         select {
233         case <-t.ceasingNetworking:
234                 return
235         default:
236         }
237         close(t.ceasingNetworking)
238         for _, c := range t.Conns {
239                 c.Close()
240         }
241         t.pruneTimer.Stop()
242 }
243
244 func (t *torrent) addPeer(p Peer) {
245         t.Peers[peersKey{string(p.IP), p.Port}] = p
246 }
247
248 func (t *torrent) invalidateMetadata() {
249         t.MetaData = nil
250         t.metadataHave = nil
251         t.Info = nil
252 }
253
254 func (t *torrent) saveMetadataPiece(index int, data []byte) {
255         if t.haveInfo() {
256                 return
257         }
258         if index >= len(t.metadataHave) {
259                 log.Printf("%s: ignoring metadata piece %d", t, index)
260                 return
261         }
262         copy(t.MetaData[(1<<14)*index:], data)
263         t.metadataHave[index] = true
264 }
265
266 func (t *torrent) metadataPieceCount() int {
267         return (len(t.MetaData) + (1 << 14) - 1) / (1 << 14)
268 }
269
270 func (t *torrent) haveMetadataPiece(piece int) bool {
271         if t.haveInfo() {
272                 return (1<<14)*piece < len(t.MetaData)
273         } else {
274                 return piece < len(t.metadataHave) && t.metadataHave[piece]
275         }
276 }
277
278 func (t *torrent) metadataSizeKnown() bool {
279         return t.MetaData != nil
280 }
281
282 func (t *torrent) metadataSize() int {
283         return len(t.MetaData)
284 }
285
286 func infoPieceHashes(info *metainfo.Info) (ret []string) {
287         for i := 0; i < len(info.Pieces); i += 20 {
288                 ret = append(ret, string(info.Pieces[i:i+20]))
289         }
290         return
291 }
292
293 // Called when metadata for a torrent becomes available.
294 func (t *torrent) setMetadata(md *metainfo.Info, infoBytes []byte, eventLocker sync.Locker) (err error) {
295         t.Info = md
296         t.length = 0
297         for _, f := range t.Info.UpvertedFiles() {
298                 t.length += f.Length
299         }
300         t.MetaData = infoBytes
301         t.metadataHave = nil
302         for _, hash := range infoPieceHashes(md) {
303                 piece := &piece{}
304                 piece.Event.L = eventLocker
305                 util.CopyExact(piece.Hash[:], hash)
306                 t.Pieces = append(t.Pieces, piece)
307         }
308         for _, conn := range t.Conns {
309                 t.initRequestOrdering(conn)
310                 if err := conn.setNumPieces(t.numPieces()); err != nil {
311                         log.Printf("closing connection: %s", err)
312                         conn.Close()
313                 }
314         }
315         return
316 }
317
318 func (t *torrent) setStorage(td data.Data) (err error) {
319         if c, ok := t.data.(io.Closer); ok {
320                 c.Close()
321         }
322         if sd, ok := td.(StatefulData); ok {
323                 t.data = sd
324         } else {
325                 t.data = &statelessDataWrapper{td, make([]bool, t.Info.NumPieces())}
326         }
327         return
328 }
329
330 func (t *torrent) haveAllMetadataPieces() bool {
331         if t.haveInfo() {
332                 return true
333         }
334         if t.metadataHave == nil {
335                 return false
336         }
337         for _, have := range t.metadataHave {
338                 if !have {
339                         return false
340                 }
341         }
342         return true
343 }
344
345 func (t *torrent) setMetadataSize(bytes int64) {
346         if t.MetaData != nil {
347                 return
348         }
349         if bytes <= 0 || bytes > 10000000 { // 10MB, pulled from my ass.
350                 return
351         }
352         t.MetaData = make([]byte, bytes)
353         t.metadataHave = make([]bool, (bytes+(1<<14)-1)/(1<<14))
354 }
355
356 // The current working name for the torrent. Either the name in the info dict,
357 // or a display name given such as by the dn value in a magnet link, or "".
358 func (t *torrent) Name() string {
359         if t.haveInfo() {
360                 return t.Info.Name
361         }
362         if t.DisplayName != "" {
363                 return t.DisplayName
364         }
365         return ""
366 }
367
368 func (t *torrent) pieceStatusChar(index int) byte {
369         p := t.Pieces[index]
370         switch {
371         case t.pieceComplete(index):
372                 return 'C'
373         case p.QueuedForHash:
374                 return 'Q'
375         case p.Hashing:
376                 return 'H'
377         case !p.EverHashed:
378                 return '?'
379         case t.piecePartiallyDownloaded(index):
380                 switch p.Priority {
381                 case piecePriorityNone:
382                         return 'F' // Forgotten
383                 default:
384                         return 'P'
385                 }
386         default:
387                 switch p.Priority {
388                 case piecePriorityNone:
389                         return 'z'
390                 case piecePriorityNow:
391                         return '!'
392                 case piecePriorityReadahead:
393                         return 'R'
394                 case piecePriorityNext:
395                         return 'N'
396                 default:
397                         return '.'
398                 }
399         }
400 }
401
402 func (t *torrent) metadataPieceSize(piece int) int {
403         return metadataPieceSize(len(t.MetaData), piece)
404 }
405
406 func (t *torrent) newMetadataExtensionMessage(c *connection, msgType int, piece int, data []byte) pp.Message {
407         d := map[string]int{
408                 "msg_type": msgType,
409                 "piece":    piece,
410         }
411         if data != nil {
412                 d["total_size"] = len(t.MetaData)
413         }
414         p, err := bencode.Marshal(d)
415         if err != nil {
416                 panic(err)
417         }
418         return pp.Message{
419                 Type:            pp.Extended,
420                 ExtendedID:      byte(c.PeerExtensionIDs["ut_metadata"]),
421                 ExtendedPayload: append(p, data...),
422         }
423 }
424
425 type PieceStatusCharSequence struct {
426         Char  byte // The state of this sequence of pieces.
427         Count int  // How many consecutive pieces have this state.
428 }
429
430 // Returns the state of pieces of the torrent. They are grouped into runs of
431 // same state. The sum of the Counts of the sequences is the number of pieces
432 // in the torrent. See the function torrent.pieceStatusChar for the possible
433 // states.
434 func (t *torrent) PieceStatusCharSequences() []PieceStatusCharSequence {
435         t.stateMu.Lock()
436         defer t.stateMu.Unlock()
437         return t.pieceStatusCharSequences()
438 }
439
440 // Returns the length of sequences of identical piece status chars.
441 func (t *torrent) pieceStatusCharSequences() (ret []PieceStatusCharSequence) {
442         var (
443                 char  byte
444                 count int
445         )
446         writeSequence := func() {
447                 ret = append(ret, PieceStatusCharSequence{char, count})
448         }
449         if len(t.Pieces) != 0 {
450                 char = t.pieceStatusChar(0)
451         }
452         for index := range t.Pieces {
453                 char1 := t.pieceStatusChar(index)
454                 if char1 == char {
455                         count++
456                 } else {
457                         writeSequence()
458                         char = char1
459                         count = 1
460                 }
461         }
462         if count != 0 {
463                 writeSequence()
464         }
465         return
466 }
467
468 func (t *torrent) writeStatus(w io.Writer) {
469         fmt.Fprintf(w, "Infohash: %x\n", t.InfoHash)
470         fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
471         fmt.Fprintf(w, "Metadata have: ")
472         for _, h := range t.metadataHave {
473                 fmt.Fprintf(w, "%c", func() rune {
474                         if h {
475                                 return 'H'
476                         } else {
477                                 return '.'
478                         }
479                 }())
480         }
481         fmt.Fprintln(w)
482         fmt.Fprintf(w, "Piece length: %s\n", func() string {
483                 if t.haveInfo() {
484                         return fmt.Sprint(t.usualPieceSize())
485                 } else {
486                         return "?"
487                 }
488         }())
489         if t.haveInfo() {
490                 fmt.Fprint(w, "Pieces: ")
491                 for _, seq := range t.pieceStatusCharSequences() {
492                         fmt.Fprintf(w, "%d%c ", seq.Count, seq.Char)
493                 }
494                 fmt.Fprintln(w)
495         }
496         fmt.Fprintf(w, "Trackers: ")
497         for _, tier := range t.Trackers {
498                 for _, tr := range tier {
499                         fmt.Fprintf(w, "%q ", tr.String())
500                 }
501         }
502         fmt.Fprintf(w, "\n")
503         fmt.Fprintf(w, "Pending peers: %d\n", len(t.Peers))
504         fmt.Fprintf(w, "Half open: %d\n", len(t.HalfOpen))
505         fmt.Fprintf(w, "Active peers: %d\n", len(t.Conns))
506         sort.Sort(&worstConns{
507                 c: t.Conns,
508                 t: t,
509         })
510         for _, c := range t.Conns {
511                 c.WriteStatus(w, t)
512         }
513 }
514
515 func (t *torrent) String() string {
516         s := t.Name()
517         if s == "" {
518                 s = fmt.Sprintf("%x", t.InfoHash)
519         }
520         return s
521 }
522
523 func (t *torrent) haveInfo() bool {
524         return t.Info != nil
525 }
526
527 // TODO: Include URIs that weren't converted to tracker clients.
528 func (t *torrent) announceList() (al [][]string) {
529         for _, tier := range t.Trackers {
530                 var l []string
531                 for _, tr := range tier {
532                         l = append(l, tr.URL())
533                 }
534                 al = append(al, l)
535         }
536         return
537 }
538
539 // Returns a run-time generated MetaInfo that includes the info bytes and
540 // announce-list as currently known to the client.
541 func (t *torrent) MetaInfo() *metainfo.MetaInfo {
542         if t.MetaData == nil {
543                 panic("info bytes not set")
544         }
545         return &metainfo.MetaInfo{
546                 Info: metainfo.InfoEx{
547                         Info:  *t.Info,
548                         Bytes: t.MetaData,
549                 },
550                 CreationDate: time.Now().Unix(),
551                 Comment:      "dynamic metainfo from client",
552                 CreatedBy:    "go.torrent",
553                 AnnounceList: t.announceList(),
554         }
555 }
556
557 func (t *torrent) bytesLeft() (left int64) {
558         if !t.haveInfo() {
559                 return -1
560         }
561         for i := 0; i < t.numPieces(); i++ {
562                 left += int64(t.pieceNumPendingBytes(i))
563         }
564         return
565 }
566
567 func (t *torrent) piecePartiallyDownloaded(index int) bool {
568         return t.pieceNumPendingBytes(index) != t.pieceLength(index)
569 }
570
571 func numChunksForPiece(chunkSize int, pieceSize int) int {
572         return (pieceSize + chunkSize - 1) / chunkSize
573 }
574
575 func (t *torrent) usualPieceSize() int {
576         return int(t.Info.PieceLength)
577 }
578
579 func (t *torrent) lastPieceSize() int {
580         return int(t.pieceLength(t.numPieces() - 1))
581 }
582
583 func (t *torrent) numPieces() int {
584         return t.Info.NumPieces()
585 }
586
587 func (t *torrent) numPiecesCompleted() (num int) {
588         for i := range iter.N(t.Info.NumPieces()) {
589                 if t.pieceComplete(i) {
590                         num++
591                 }
592         }
593         return
594 }
595
596 func (t *torrent) Length() int64 {
597         return t.length
598 }
599
600 func (t *torrent) isClosed() bool {
601         select {
602         case <-t.closing:
603                 return true
604         default:
605                 return false
606         }
607 }
608
609 func (t *torrent) close() (err error) {
610         if t.isClosed() {
611                 return
612         }
613         t.ceaseNetworking()
614         close(t.closing)
615         if c, ok := t.data.(io.Closer); ok {
616                 c.Close()
617         }
618         for _, conn := range t.Conns {
619                 conn.Close()
620         }
621         return
622 }
623
624 // Return the request that would include the given offset into the torrent data.
625 func torrentOffsetRequest(torrentLength, pieceSize, chunkSize, offset int64) (
626         r request, ok bool) {
627         if offset < 0 || offset >= torrentLength {
628                 return
629         }
630         r.Index = pp.Integer(offset / pieceSize)
631         r.Begin = pp.Integer(offset % pieceSize / chunkSize * chunkSize)
632         left := torrentLength - int64(r.Index)*pieceSize - int64(r.Begin)
633         if chunkSize < left {
634                 r.Length = pp.Integer(chunkSize)
635         } else {
636                 r.Length = pp.Integer(left)
637         }
638         ok = true
639         return
640 }
641
642 func torrentRequestOffset(torrentLength, pieceSize int64, r request) (off int64) {
643         off = int64(r.Index)*pieceSize + int64(r.Begin)
644         if off < 0 || off >= torrentLength {
645                 panic("invalid request")
646         }
647         return
648 }
649
650 func (t *torrent) requestOffset(r request) int64 {
651         return torrentRequestOffset(t.Length(), int64(t.usualPieceSize()), r)
652 }
653
654 // Return the request that would include the given offset into the torrent data.
655 func (t *torrent) offsetRequest(off int64) (req request, ok bool) {
656         return torrentOffsetRequest(t.Length(), t.Info.PieceLength, chunkSize, off)
657 }
658
659 func (t *torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
660         _, err = t.data.WriteAt(data, int64(piece)*t.Info.PieceLength+begin)
661         return
662 }
663
664 func (t *torrent) bitfield() (bf []bool) {
665         for _, p := range t.Pieces {
666                 bf = append(bf, p.EverHashed && len(p.PendingChunkSpecs) == 0)
667         }
668         return
669 }
670
671 func (t *torrent) pieceChunks(piece int) (css []chunkSpec) {
672         css = make([]chunkSpec, 0, (t.pieceLength(piece)+chunkSize-1)/chunkSize)
673         var cs chunkSpec
674         for left := t.pieceLength(piece); left != 0; left -= cs.Length {
675                 cs.Length = left
676                 if cs.Length > chunkSize {
677                         cs.Length = chunkSize
678                 }
679                 css = append(css, cs)
680                 cs.Begin += cs.Length
681         }
682         return
683 }
684
685 func (t *torrent) pendAllChunkSpecs(index int) {
686         piece := t.Pieces[index]
687         if piece.PendingChunkSpecs == nil {
688                 piece.PendingChunkSpecs = make(
689                         map[chunkSpec]struct{},
690                         (t.pieceLength(index)+chunkSize-1)/chunkSize)
691         }
692         pcss := piece.PendingChunkSpecs
693         for _, cs := range t.pieceChunks(int(index)) {
694                 pcss[cs] = struct{}{}
695         }
696         return
697 }
698
699 type Peer struct {
700         Id     [20]byte
701         IP     net.IP
702         Port   int
703         Source peerSource
704         // Peer is known to support encryption.
705         SupportsEncryption bool
706 }
707
708 func (t *torrent) pieceLength(piece int) (len_ pp.Integer) {
709         if int(piece) == t.numPieces()-1 {
710                 len_ = pp.Integer(t.Length() % t.Info.PieceLength)
711         }
712         if len_ == 0 {
713                 len_ = pp.Integer(t.Info.PieceLength)
714         }
715         return
716 }
717
718 func (t *torrent) hashPiece(piece pp.Integer) (ps pieceSum) {
719         hash := pieceHash.New()
720         t.data.WriteSectionTo(hash, int64(piece)*t.Info.PieceLength, t.Info.PieceLength)
721         util.CopyExact(ps[:], hash.Sum(nil))
722         return
723 }
724
725 func (t *torrent) haveAllPieces() bool {
726         if !t.haveInfo() {
727                 return false
728         }
729         for i := range t.Pieces {
730                 if !t.pieceComplete(i) {
731                         return false
732                 }
733         }
734         return true
735 }
736
737 func (me *torrent) haveAnyPieces() bool {
738         for i := range me.Pieces {
739                 if me.pieceComplete(i) {
740                         return true
741                 }
742         }
743         return false
744 }
745
746 func (t *torrent) havePiece(index int) bool {
747         return t.haveInfo() && t.pieceComplete(index)
748 }
749
750 func (t *torrent) haveChunk(r request) bool {
751         p := t.Pieces[r.Index]
752         if !p.EverHashed {
753                 return false
754         }
755         _, ok := p.PendingChunkSpecs[r.chunkSpec]
756         return !ok
757 }
758
759 func (t *torrent) wantChunk(r request) bool {
760         if !t.wantPiece(int(r.Index)) {
761                 return false
762         }
763         _, ok := t.Pieces[r.Index].PendingChunkSpecs[r.chunkSpec]
764         return ok
765 }
766
767 func (t *torrent) wantPiece(index int) bool {
768         if !t.haveInfo() {
769                 return false
770         }
771         p := t.Pieces[index]
772         // Put piece complete check last, since it's the slowest!
773         return p.Priority != piecePriorityNone && !p.QueuedForHash && !p.Hashing && !t.pieceComplete(index)
774 }
775
776 func (t *torrent) connHasWantedPieces(c *connection) bool {
777         return c.pieceRequestOrder != nil && c.pieceRequestOrder.First() != nil
778 }
779
780 func (t *torrent) extentPieces(off, _len int64) (pieces []int) {
781         for i := off / int64(t.usualPieceSize()); i*int64(t.usualPieceSize()) < off+_len; i++ {
782                 pieces = append(pieces, int(i))
783         }
784         return
785 }