]> Sergey Matveev's repositories - btrtrc.git/blob - torrent.go
Add support for Fast Extension
[btrtrc.git] / torrent.go
1 package torrent
2
3 import (
4         "container/heap"
5         "errors"
6         "fmt"
7         "io"
8         "log"
9         "net"
10         "os"
11         "sort"
12         "sync"
13         "time"
14
15         "github.com/bradfitz/iter"
16
17         "bitbucket.org/anacrolix/go.torrent/data"
18         pp "bitbucket.org/anacrolix/go.torrent/peer_protocol"
19         "bitbucket.org/anacrolix/go.torrent/tracker"
20         "bitbucket.org/anacrolix/go.torrent/util"
21         "github.com/anacrolix/libtorgo/bencode"
22         "github.com/anacrolix/libtorgo/metainfo"
23 )
24
25 func (t *torrent) PieceNumPendingBytes(index int) (count pp.Integer) {
26         if t.pieceComplete(index) {
27                 return 0
28         }
29         piece := t.Pieces[index]
30         if !piece.EverHashed {
31                 return t.PieceLength(index)
32         }
33         pendingChunks := t.Pieces[index].PendingChunkSpecs
34         count = pp.Integer(len(pendingChunks)) * chunkSize
35         _lastChunkSpec := lastChunkSpec(t.PieceLength(index))
36         if _lastChunkSpec.Length != chunkSize {
37                 if _, ok := pendingChunks[_lastChunkSpec]; ok {
38                         count += _lastChunkSpec.Length - chunkSize
39                 }
40         }
41         return
42 }
43
44 type peersKey struct {
45         IPBytes string
46         Port    int
47 }
48
49 // Data maintains per-piece persistent state.
50 type StatefulData interface {
51         data.Data
52         // We believe the piece data will pass a hash check.
53         PieceCompleted(index int) error
54         // Returns true if the piece is complete.
55         PieceComplete(index int) bool
56 }
57
58 // Is not aware of Client. Maintains state of torrent for with-in a Client.
59 type torrent struct {
60         stateMu sync.Mutex
61         closing chan struct{}
62
63         // Closed when no more network activity is desired. This includes
64         // announcing, and communicating with peers.
65         ceasingNetworking chan struct{}
66
67         InfoHash InfoHash
68         Pieces   []*piece
69         length   int64
70
71         data StatefulData
72
73         Info *metainfo.Info
74         // Active peer connections.
75         Conns []*connection
76         // Set of addrs to which we're attempting to connect.
77         HalfOpen map[string]struct{}
78
79         // Reserve of peers to connect to. A peer can be both here and in the
80         // active connections if were told about the peer after connecting with
81         // them. That encourages us to reconnect to peers that are well known.
82         Peers     map[peersKey]Peer
83         wantPeers sync.Cond
84
85         // BEP 12 Multitracker Metadata Extension. The tracker.Client instances
86         // mirror their respective URLs from the announce-list metainfo key.
87         Trackers     [][]tracker.Client
88         DisplayName  string
89         MetaData     []byte
90         metadataHave []bool
91
92         gotMetainfo chan struct{}
93         GotMetainfo <-chan struct{}
94
95         pruneTimer *time.Timer
96 }
97
98 func (t *torrent) pieceComplete(piece int) bool {
99         // TODO: This is called when setting metadata, and before storage is
100         // assigned, which doesn't seem right.
101         return t.data != nil && t.data.PieceComplete(piece)
102 }
103
104 // A file-like handle to torrent data that implements SectionOpener. Opened
105 // sections will be reused so long as Reads and ReadAt's are contiguous.
106 type handle struct {
107         rc     io.ReadCloser
108         rcOff  int64
109         curOff int64
110         so     SectionOpener
111         size   int64
112         t      Torrent
113 }
114
115 func (h *handle) Close() error {
116         if h.rc != nil {
117                 return h.rc.Close()
118         }
119         return nil
120 }
121
122 func (h *handle) ReadAt(b []byte, off int64) (n int, err error) {
123         return h.readAt(b, off)
124 }
125
126 func (h *handle) readAt(b []byte, off int64) (n int, err error) {
127         avail := h.t.prepareRead(off)
128         if int64(len(b)) > avail {
129                 b = b[:avail]
130         }
131         if int64(len(b)) > h.size-off {
132                 b = b[:h.size-off]
133         }
134         if h.rcOff != off && h.rc != nil {
135                 h.rc.Close()
136                 h.rc = nil
137         }
138         if h.rc == nil {
139                 h.rc, err = h.so.OpenSection(off, h.size-off)
140                 if err != nil {
141                         return
142                 }
143                 h.rcOff = off
144         }
145         n, err = h.rc.Read(b)
146         h.rcOff += int64(n)
147         return
148 }
149
150 func (h *handle) Read(b []byte) (n int, err error) {
151         n, err = h.readAt(b, h.curOff)
152         h.curOff = h.rcOff
153         return
154 }
155
156 func (h *handle) Seek(off int64, whence int) (newOff int64, err error) {
157         switch whence {
158         case os.SEEK_SET:
159                 h.curOff = off
160         case os.SEEK_CUR:
161                 h.curOff += off
162         case os.SEEK_END:
163                 h.curOff = h.size + off
164         default:
165                 err = errors.New("bad whence")
166         }
167         newOff = h.curOff
168         return
169 }
170
171 // Implements Handle on top of an io.SectionReader.
172 type sectionReaderHandle struct {
173         *io.SectionReader
174 }
175
176 func (sectionReaderHandle) Close() error { return nil }
177
178 func (T Torrent) NewReadHandle() Handle {
179         if so, ok := T.data.(SectionOpener); ok {
180                 return &handle{
181                         so:   so,
182                         size: T.Length(),
183                         t:    T,
184                 }
185         }
186         return sectionReaderHandle{io.NewSectionReader(T, 0, T.Length())}
187 }
188
189 func (t *torrent) numConnsUnchoked() (num int) {
190         for _, c := range t.Conns {
191                 if !c.PeerChoked {
192                         num++
193                 }
194         }
195         return
196 }
197
198 func (t *torrent) addrActive(addr string) bool {
199         if _, ok := t.HalfOpen[addr]; ok {
200                 return true
201         }
202         for _, c := range t.Conns {
203                 if c.Socket.RemoteAddr().String() == addr {
204                         return true
205                 }
206         }
207         return false
208 }
209
210 func (t *torrent) worstConnsHeap() (wcs *worstConns) {
211         wcs = &worstConns{
212                 c: append([]*connection{}, t.Conns...),
213                 t: t,
214         }
215         heap.Init(wcs)
216         return
217 }
218
219 func (t *torrent) ceaseNetworking() {
220         t.stateMu.Lock()
221         defer t.stateMu.Unlock()
222         select {
223         case <-t.ceasingNetworking:
224                 return
225         default:
226         }
227         close(t.ceasingNetworking)
228         for _, c := range t.Conns {
229                 c.Close()
230         }
231         t.pruneTimer.Stop()
232 }
233
234 func (t *torrent) addPeer(p Peer) {
235         t.Peers[peersKey{string(p.IP), p.Port}] = p
236 }
237
238 func (t *torrent) invalidateMetadata() {
239         t.MetaData = nil
240         t.metadataHave = nil
241         t.Info = nil
242 }
243
244 func (t *torrent) SaveMetadataPiece(index int, data []byte) {
245         if t.haveInfo() {
246                 return
247         }
248         if index >= len(t.metadataHave) {
249                 log.Printf("%s: ignoring metadata piece %d", t, index)
250                 return
251         }
252         copy(t.MetaData[(1<<14)*index:], data)
253         t.metadataHave[index] = true
254 }
255
256 func (t *torrent) metadataPieceCount() int {
257         return (len(t.MetaData) + (1 << 14) - 1) / (1 << 14)
258 }
259
260 func (t *torrent) haveMetadataPiece(piece int) bool {
261         if t.haveInfo() {
262                 return (1<<14)*piece < len(t.MetaData)
263         } else {
264                 return piece < len(t.metadataHave) && t.metadataHave[piece]
265         }
266 }
267
268 func (t *torrent) metadataSizeKnown() bool {
269         return t.MetaData != nil
270 }
271
272 func (t *torrent) metadataSize() int {
273         return len(t.MetaData)
274 }
275
276 func infoPieceHashes(info *metainfo.Info) (ret []string) {
277         for i := 0; i < len(info.Pieces); i += 20 {
278                 ret = append(ret, string(info.Pieces[i:i+20]))
279         }
280         return
281 }
282
283 // Called when metadata for a torrent becomes available.
284 func (t *torrent) setMetadata(md metainfo.Info, infoBytes []byte, eventLocker sync.Locker) (err error) {
285         t.Info = &md
286         t.length = 0
287         for _, f := range t.Info.UpvertedFiles() {
288                 t.length += f.Length
289         }
290         t.MetaData = infoBytes
291         t.metadataHave = nil
292         for _, hash := range infoPieceHashes(&md) {
293                 piece := &piece{}
294                 piece.Event.L = eventLocker
295                 util.CopyExact(piece.Hash[:], hash)
296                 t.Pieces = append(t.Pieces, piece)
297         }
298         for _, conn := range t.Conns {
299                 t.initRequestOrdering(conn)
300                 if err := conn.setNumPieces(t.numPieces()); err != nil {
301                         log.Printf("closing connection: %s", err)
302                         conn.Close()
303                 }
304         }
305         return
306 }
307
308 func (t *torrent) setStorage(td data.Data) (err error) {
309         if c, ok := t.data.(io.Closer); ok {
310                 c.Close()
311         }
312         if sd, ok := td.(StatefulData); ok {
313                 t.data = sd
314         } else {
315                 t.data = &statelessDataWrapper{td, make([]bool, t.Info.NumPieces())}
316         }
317         return
318 }
319
320 func (t *torrent) haveAllMetadataPieces() bool {
321         if t.haveInfo() {
322                 return true
323         }
324         if t.metadataHave == nil {
325                 return false
326         }
327         for _, have := range t.metadataHave {
328                 if !have {
329                         return false
330                 }
331         }
332         return true
333 }
334
335 func (t *torrent) SetMetadataSize(bytes int64) {
336         if t.MetaData != nil {
337                 return
338         }
339         if bytes > 10000000 { // 10MB, pulled from my ass.
340                 return
341         }
342         t.MetaData = make([]byte, bytes)
343         t.metadataHave = make([]bool, (bytes+(1<<14)-1)/(1<<14))
344 }
345
346 // The current working name for the torrent. Either the name in the info dict,
347 // or a display name given such as by the dn value in a magnet link, or "".
348 func (t *torrent) Name() string {
349         if t.haveInfo() {
350                 return t.Info.Name
351         }
352         if t.DisplayName != "" {
353                 return t.DisplayName
354         }
355         return ""
356 }
357
358 func (t *torrent) pieceStatusChar(index int) byte {
359         p := t.Pieces[index]
360         switch {
361         case t.pieceComplete(index):
362                 return 'C'
363         case p.QueuedForHash:
364                 return 'Q'
365         case p.Hashing:
366                 return 'H'
367         case !p.EverHashed:
368                 return '?'
369         case t.piecePartiallyDownloaded(index):
370                 switch p.Priority {
371                 case piecePriorityNone:
372                         return 'F' // Forgotten
373                 default:
374                         return 'P'
375                 }
376         default:
377                 switch p.Priority {
378                 case piecePriorityNone:
379                         return 'z'
380                 case piecePriorityNow:
381                         return '!'
382                 case piecePriorityReadahead:
383                         return 'R'
384                 case piecePriorityNext:
385                         return 'N'
386                 default:
387                         return '.'
388                 }
389         }
390 }
391
392 func (t *torrent) metadataPieceSize(piece int) int {
393         return metadataPieceSize(len(t.MetaData), piece)
394 }
395
396 func (t *torrent) newMetadataExtensionMessage(c *connection, msgType int, piece int, data []byte) pp.Message {
397         d := map[string]int{
398                 "msg_type": msgType,
399                 "piece":    piece,
400         }
401         if data != nil {
402                 d["total_size"] = len(t.MetaData)
403         }
404         p, err := bencode.Marshal(d)
405         if err != nil {
406                 panic(err)
407         }
408         return pp.Message{
409                 Type:            pp.Extended,
410                 ExtendedID:      byte(c.PeerExtensionIDs["ut_metadata"]),
411                 ExtendedPayload: append(p, data...),
412         }
413 }
414
415 type PieceStatusCharSequence struct {
416         Char  byte
417         Count int
418 }
419
420 func (t *torrent) PieceStatusCharSequences() []PieceStatusCharSequence {
421         t.stateMu.Lock()
422         defer t.stateMu.Unlock()
423         return t.pieceStatusCharSequences()
424 }
425
426 // Returns the length of sequences of identical piece status chars.
427 func (t *torrent) pieceStatusCharSequences() (ret []PieceStatusCharSequence) {
428         var (
429                 char  byte
430                 count int
431         )
432         writeSequence := func() {
433                 ret = append(ret, PieceStatusCharSequence{char, count})
434         }
435         if len(t.Pieces) != 0 {
436                 char = t.pieceStatusChar(0)
437         }
438         for index := range t.Pieces {
439                 char1 := t.pieceStatusChar(index)
440                 if char1 == char {
441                         count++
442                 } else {
443                         writeSequence()
444                         char = char1
445                         count = 1
446                 }
447         }
448         if count != 0 {
449                 writeSequence()
450         }
451         return
452 }
453
454 func (t *torrent) WriteStatus(w io.Writer) {
455         fmt.Fprintf(w, "Infohash: %x\n", t.InfoHash)
456         fmt.Fprintf(w, "Piece length: %s\n", func() string {
457                 if t.haveInfo() {
458                         return fmt.Sprint(t.usualPieceSize())
459                 } else {
460                         return "?"
461                 }
462         }())
463         if t.haveInfo() {
464                 fmt.Fprint(w, "Pieces: ")
465                 for _, seq := range t.pieceStatusCharSequences() {
466                         fmt.Fprintf(w, "%d%c ", seq.Count, seq.Char)
467                 }
468                 fmt.Fprintln(w)
469         }
470         fmt.Fprintf(w, "Trackers: ")
471         for _, tier := range t.Trackers {
472                 for _, tr := range tier {
473                         fmt.Fprintf(w, "%q ", tr.String())
474                 }
475         }
476         fmt.Fprintf(w, "\n")
477         fmt.Fprintf(w, "Pending peers: %d\n", len(t.Peers))
478         fmt.Fprintf(w, "Half open: %d\n", len(t.HalfOpen))
479         fmt.Fprintf(w, "Active peers: %d\n", len(t.Conns))
480         sort.Sort(&worstConns{
481                 c: t.Conns,
482                 t: t,
483         })
484         for _, c := range t.Conns {
485                 c.WriteStatus(w, t)
486         }
487 }
488
489 func (t *torrent) String() string {
490         s := t.Name()
491         if s == "" {
492                 s = fmt.Sprintf("%x", t.InfoHash)
493         }
494         return s
495 }
496
497 func (t *torrent) haveInfo() bool {
498         return t.Info != nil
499 }
500
501 // TODO: Include URIs that weren't converted to tracker clients.
502 func (t *torrent) announceList() (al [][]string) {
503         for _, tier := range t.Trackers {
504                 var l []string
505                 for _, tr := range tier {
506                         l = append(l, tr.URL())
507                 }
508                 al = append(al, l)
509         }
510         return
511 }
512
513 func (t *torrent) MetaInfo() *metainfo.MetaInfo {
514         if t.MetaData == nil {
515                 panic("info bytes not set")
516         }
517         return &metainfo.MetaInfo{
518                 Info: metainfo.InfoEx{
519                         Info:  *t.Info,
520                         Bytes: t.MetaData,
521                 },
522                 CreationDate: time.Now().Unix(),
523                 Comment:      "dynamic metainfo from client",
524                 CreatedBy:    "go.torrent",
525                 AnnounceList: t.announceList(),
526         }
527 }
528
529 func (t *torrent) bytesLeft() (left int64) {
530         if !t.haveInfo() {
531                 return -1
532         }
533         for i := 0; i < t.numPieces(); i++ {
534                 left += int64(t.PieceNumPendingBytes(i))
535         }
536         return
537 }
538
539 func (t *torrent) piecePartiallyDownloaded(index int) bool {
540         return t.PieceNumPendingBytes(index) != t.PieceLength(index)
541 }
542
543 func numChunksForPiece(chunkSize int, pieceSize int) int {
544         return (pieceSize + chunkSize - 1) / chunkSize
545 }
546
547 func (t *torrent) usualPieceSize() int {
548         return int(t.Info.PieceLength)
549 }
550
551 func (t *torrent) lastPieceSize() int {
552         return int(t.PieceLength(t.numPieces() - 1))
553 }
554
555 func (t *torrent) numPieces() int {
556         return t.Info.NumPieces()
557 }
558
559 func (t *torrent) numPiecesCompleted() (num int) {
560         for i := range iter.N(t.Info.NumPieces()) {
561                 if t.pieceComplete(i) {
562                         num++
563                 }
564         }
565         return
566 }
567
568 func (t *torrent) Length() int64 {
569         return t.length
570 }
571
572 func (t *torrent) isClosed() bool {
573         select {
574         case <-t.closing:
575                 return true
576         default:
577                 return false
578         }
579 }
580
581 func (t *torrent) close() (err error) {
582         if t.isClosed() {
583                 return
584         }
585         t.ceaseNetworking()
586         close(t.closing)
587         if c, ok := t.data.(io.Closer); ok {
588                 c.Close()
589         }
590         for _, conn := range t.Conns {
591                 conn.Close()
592         }
593         return
594 }
595
596 // Return the request that would include the given offset into the torrent data.
597 func torrentOffsetRequest(torrentLength, pieceSize, chunkSize, offset int64) (
598         r request, ok bool) {
599         if offset < 0 || offset >= torrentLength {
600                 return
601         }
602         r.Index = pp.Integer(offset / pieceSize)
603         r.Begin = pp.Integer(offset % pieceSize / chunkSize * chunkSize)
604         left := torrentLength - int64(r.Index)*pieceSize - int64(r.Begin)
605         if chunkSize < left {
606                 r.Length = pp.Integer(chunkSize)
607         } else {
608                 r.Length = pp.Integer(left)
609         }
610         ok = true
611         return
612 }
613
614 func torrentRequestOffset(torrentLength, pieceSize int64, r request) (off int64) {
615         off = int64(r.Index)*pieceSize + int64(r.Begin)
616         if off < 0 || off >= torrentLength {
617                 panic("invalid request")
618         }
619         return
620 }
621
622 func (t *torrent) requestOffset(r request) int64 {
623         return torrentRequestOffset(t.Length(), int64(t.usualPieceSize()), r)
624 }
625
626 // Return the request that would include the given offset into the torrent data.
627 func (t *torrent) offsetRequest(off int64) (req request, ok bool) {
628         return torrentOffsetRequest(t.Length(), t.Info.PieceLength, chunkSize, off)
629 }
630
631 func (t *torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
632         _, err = t.data.WriteAt(data, int64(piece)*t.Info.PieceLength+begin)
633         return
634 }
635
636 func (t *torrent) bitfield() (bf []bool) {
637         for _, p := range t.Pieces {
638                 bf = append(bf, p.EverHashed && len(p.PendingChunkSpecs) == 0)
639         }
640         return
641 }
642
643 func (t *torrent) pieceChunks(piece int) (css []chunkSpec) {
644         css = make([]chunkSpec, 0, (t.PieceLength(piece)+chunkSize-1)/chunkSize)
645         var cs chunkSpec
646         for left := t.PieceLength(piece); left != 0; left -= cs.Length {
647                 cs.Length = left
648                 if cs.Length > chunkSize {
649                         cs.Length = chunkSize
650                 }
651                 css = append(css, cs)
652                 cs.Begin += cs.Length
653         }
654         return
655 }
656
657 func (t *torrent) pendAllChunkSpecs(index int) {
658         piece := t.Pieces[index]
659         if piece.PendingChunkSpecs == nil {
660                 piece.PendingChunkSpecs = make(
661                         map[chunkSpec]struct{},
662                         (t.PieceLength(index)+chunkSize-1)/chunkSize)
663         }
664         pcss := piece.PendingChunkSpecs
665         for _, cs := range t.pieceChunks(int(index)) {
666                 pcss[cs] = struct{}{}
667         }
668         return
669 }
670
671 type Peer struct {
672         Id     [20]byte
673         IP     net.IP
674         Port   int
675         Source peerSource
676 }
677
678 func (t *torrent) PieceLength(piece int) (len_ pp.Integer) {
679         if int(piece) == t.numPieces()-1 {
680                 len_ = pp.Integer(t.Length() % t.Info.PieceLength)
681         }
682         if len_ == 0 {
683                 len_ = pp.Integer(t.Info.PieceLength)
684         }
685         return
686 }
687
688 func (t *torrent) hashPiece(piece pp.Integer) (ps pieceSum) {
689         hash := pieceHash.New()
690         t.data.WriteSectionTo(hash, int64(piece)*t.Info.PieceLength, t.Info.PieceLength)
691         util.CopyExact(ps[:], hash.Sum(nil))
692         return
693 }
694
695 func (t *torrent) haveAllPieces() bool {
696         if !t.haveInfo() {
697                 return false
698         }
699         for i := range t.Pieces {
700                 if !t.pieceComplete(i) {
701                         return false
702                 }
703         }
704         return true
705 }
706
707 func (me *torrent) haveAnyPieces() bool {
708         for i := range me.Pieces {
709                 if me.pieceComplete(i) {
710                         return true
711                 }
712         }
713         return false
714 }
715
716 func (t *torrent) havePiece(index int) bool {
717         return t.haveInfo() && t.pieceComplete(index)
718 }
719
720 func (t *torrent) haveChunk(r request) bool {
721         p := t.Pieces[r.Index]
722         if !p.EverHashed {
723                 return false
724         }
725         _, ok := p.PendingChunkSpecs[r.chunkSpec]
726         return !ok
727 }
728
729 func (t *torrent) wantChunk(r request) bool {
730         if !t.wantPiece(int(r.Index)) {
731                 return false
732         }
733         _, ok := t.Pieces[r.Index].PendingChunkSpecs[r.chunkSpec]
734         return ok
735 }
736
737 func (t *torrent) wantPiece(index int) bool {
738         if !t.haveInfo() {
739                 return false
740         }
741         p := t.Pieces[index]
742         return !t.pieceComplete(index) && p.Priority != piecePriorityNone && !p.QueuedForHash && !p.Hashing
743 }
744
745 func (t *torrent) connHasWantedPieces(c *connection) bool {
746         for p := range t.Pieces {
747                 if t.wantPiece(p) && c.PeerHasPiece(p) {
748                         return true
749                 }
750         }
751         return false
752 }
753
754 func (t *torrent) extentPieces(off, _len int64) (pieces []int) {
755         for i := off / int64(t.usualPieceSize()); i*int64(t.usualPieceSize()) < off+_len; i++ {
756                 pieces = append(pieces, int(i))
757         }
758         return
759 }