]> Sergey Matveev's repositories - btrtrc.git/blob - torrent.go
Rewrite handshaking and connection management
[btrtrc.git] / torrent.go
1 package torrent
2
3 import (
4         "container/heap"
5         "errors"
6         "fmt"
7         "io"
8         "log"
9         "net"
10         "os"
11         "sort"
12         "sync"
13         "time"
14
15         "github.com/bradfitz/iter"
16
17         "bitbucket.org/anacrolix/go.torrent/data"
18         pp "bitbucket.org/anacrolix/go.torrent/peer_protocol"
19         "bitbucket.org/anacrolix/go.torrent/tracker"
20         "bitbucket.org/anacrolix/go.torrent/util"
21         "github.com/anacrolix/libtorgo/bencode"
22         "github.com/anacrolix/libtorgo/metainfo"
23 )
24
25 func (t *torrent) PieceNumPendingBytes(index int) (count pp.Integer) {
26         if t.pieceComplete(index) {
27                 return 0
28         }
29         piece := t.Pieces[index]
30         if !piece.EverHashed {
31                 return t.PieceLength(index)
32         }
33         pendingChunks := t.Pieces[index].PendingChunkSpecs
34         count = pp.Integer(len(pendingChunks)) * chunkSize
35         _lastChunkSpec := lastChunkSpec(t.PieceLength(index))
36         if _lastChunkSpec.Length != chunkSize {
37                 if _, ok := pendingChunks[_lastChunkSpec]; ok {
38                         count += _lastChunkSpec.Length - chunkSize
39                 }
40         }
41         return
42 }
43
44 type peersKey struct {
45         IPBytes string
46         Port    int
47 }
48
49 // Data maintains per-piece persistent state.
50 type StatefulData interface {
51         data.Data
52         // We believe the piece data will pass a hash check.
53         PieceCompleted(index int) error
54         // Returns true if the piece is complete.
55         PieceComplete(index int) bool
56 }
57
58 // Is not aware of Client. Maintains state of torrent for with-in a Client.
59 type torrent struct {
60         stateMu sync.Mutex
61         closing chan struct{}
62
63         // Closed when no more network activity is desired. This includes
64         // announcing, and communicating with peers.
65         ceasingNetworking chan struct{}
66
67         InfoHash InfoHash
68         Pieces   []*piece
69         // Total length of the torrent in bytes. Stored because it's not O(1) to
70         // get this from the info dict.
71         length int64
72
73         data StatefulData
74
75         // The info dict. Nil if we don't have it.
76         Info *metainfo.Info
77         // Active peer connections, running message stream loops.
78         Conns []*connection
79         // Set of addrs to which we're attempting to connect. Connections are
80         // half-open until all handshakes are completed.
81         HalfOpen map[string]struct{}
82
83         // Reserve of peers to connect to. A peer can be both here and in the
84         // active connections if were told about the peer after connecting with
85         // them. That encourages us to reconnect to peers that are well known.
86         Peers     map[peersKey]Peer
87         wantPeers sync.Cond
88
89         // BEP 12 Multitracker Metadata Extension. The tracker.Client instances
90         // mirror their respective URLs from the announce-list metainfo key.
91         Trackers [][]tracker.Client
92         // Name used if the info name isn't available.
93         DisplayName string
94         // The bencoded bytes of the info dict.
95         MetaData []byte
96         // Each element corresponds to the 16KiB metadata pieces. If true, we have
97         // received that piece.
98         metadataHave []bool
99
100         // Closed when .Info is set.
101         gotMetainfo chan struct{}
102         GotMetainfo <-chan struct{}
103
104         pruneTimer *time.Timer
105 }
106
107 func (t *torrent) pieceComplete(piece int) bool {
108         // TODO: This is called when setting metadata, and before storage is
109         // assigned, which doesn't seem right.
110         return t.data != nil && t.data.PieceComplete(piece)
111 }
112
113 // A file-like handle to torrent data that implements SectionOpener. Opened
114 // sections will be reused so long as Reads and ReadAt's are contiguous.
115 type handle struct {
116         rc     io.ReadCloser
117         rcOff  int64
118         curOff int64
119         so     SectionOpener
120         size   int64
121         t      Torrent
122 }
123
124 func (h *handle) Close() error {
125         if h.rc != nil {
126                 return h.rc.Close()
127         }
128         return nil
129 }
130
131 func (h *handle) ReadAt(b []byte, off int64) (n int, err error) {
132         return h.readAt(b, off)
133 }
134
135 func (h *handle) readAt(b []byte, off int64) (n int, err error) {
136         avail := h.t.prepareRead(off)
137         if int64(len(b)) > avail {
138                 b = b[:avail]
139         }
140         if int64(len(b)) > h.size-off {
141                 b = b[:h.size-off]
142         }
143         if h.rcOff != off && h.rc != nil {
144                 h.rc.Close()
145                 h.rc = nil
146         }
147         if h.rc == nil {
148                 h.rc, err = h.so.OpenSection(off, h.size-off)
149                 if err != nil {
150                         return
151                 }
152                 h.rcOff = off
153         }
154         n, err = h.rc.Read(b)
155         h.rcOff += int64(n)
156         return
157 }
158
159 func (h *handle) Read(b []byte) (n int, err error) {
160         n, err = h.readAt(b, h.curOff)
161         h.curOff = h.rcOff
162         return
163 }
164
165 func (h *handle) Seek(off int64, whence int) (newOff int64, err error) {
166         switch whence {
167         case os.SEEK_SET:
168                 h.curOff = off
169         case os.SEEK_CUR:
170                 h.curOff += off
171         case os.SEEK_END:
172                 h.curOff = h.size + off
173         default:
174                 err = errors.New("bad whence")
175         }
176         newOff = h.curOff
177         return
178 }
179
180 // Implements Handle on top of an io.SectionReader.
181 type sectionReaderHandle struct {
182         *io.SectionReader
183 }
184
185 func (sectionReaderHandle) Close() error { return nil }
186
187 func (T Torrent) NewReadHandle() Handle {
188         if so, ok := T.data.(SectionOpener); ok {
189                 return &handle{
190                         so:   so,
191                         size: T.Length(),
192                         t:    T,
193                 }
194         }
195         return sectionReaderHandle{io.NewSectionReader(T, 0, T.Length())}
196 }
197
198 func (t *torrent) numConnsUnchoked() (num int) {
199         for _, c := range t.Conns {
200                 if !c.PeerChoked {
201                         num++
202                 }
203         }
204         return
205 }
206
207 // There's a connection to that address already.
208 func (t *torrent) addrActive(addr string) bool {
209         if _, ok := t.HalfOpen[addr]; ok {
210                 return true
211         }
212         for _, c := range t.Conns {
213                 if c.remoteAddr().String() == addr {
214                         return true
215                 }
216         }
217         return false
218 }
219
220 func (t *torrent) worstConnsHeap() (wcs *worstConns) {
221         wcs = &worstConns{
222                 c: append([]*connection{}, t.Conns...),
223                 t: t,
224         }
225         heap.Init(wcs)
226         return
227 }
228
229 func (t *torrent) ceaseNetworking() {
230         t.stateMu.Lock()
231         defer t.stateMu.Unlock()
232         select {
233         case <-t.ceasingNetworking:
234                 return
235         default:
236         }
237         close(t.ceasingNetworking)
238         for _, c := range t.Conns {
239                 c.Close()
240         }
241         t.pruneTimer.Stop()
242 }
243
244 func (t *torrent) addPeer(p Peer) {
245         t.Peers[peersKey{string(p.IP), p.Port}] = p
246 }
247
248 func (t *torrent) invalidateMetadata() {
249         t.MetaData = nil
250         t.metadataHave = nil
251         t.Info = nil
252 }
253
254 func (t *torrent) SaveMetadataPiece(index int, data []byte) {
255         if t.haveInfo() {
256                 return
257         }
258         if index >= len(t.metadataHave) {
259                 log.Printf("%s: ignoring metadata piece %d", t, index)
260                 return
261         }
262         copy(t.MetaData[(1<<14)*index:], data)
263         t.metadataHave[index] = true
264 }
265
266 func (t *torrent) metadataPieceCount() int {
267         return (len(t.MetaData) + (1 << 14) - 1) / (1 << 14)
268 }
269
270 func (t *torrent) haveMetadataPiece(piece int) bool {
271         if t.haveInfo() {
272                 return (1<<14)*piece < len(t.MetaData)
273         } else {
274                 return piece < len(t.metadataHave) && t.metadataHave[piece]
275         }
276 }
277
278 func (t *torrent) metadataSizeKnown() bool {
279         return t.MetaData != nil
280 }
281
282 func (t *torrent) metadataSize() int {
283         return len(t.MetaData)
284 }
285
286 func infoPieceHashes(info *metainfo.Info) (ret []string) {
287         for i := 0; i < len(info.Pieces); i += 20 {
288                 ret = append(ret, string(info.Pieces[i:i+20]))
289         }
290         return
291 }
292
293 // Called when metadata for a torrent becomes available.
294 func (t *torrent) setMetadata(md metainfo.Info, infoBytes []byte, eventLocker sync.Locker) (err error) {
295         t.Info = &md
296         t.length = 0
297         for _, f := range t.Info.UpvertedFiles() {
298                 t.length += f.Length
299         }
300         t.MetaData = infoBytes
301         t.metadataHave = nil
302         for _, hash := range infoPieceHashes(&md) {
303                 piece := &piece{}
304                 piece.Event.L = eventLocker
305                 util.CopyExact(piece.Hash[:], hash)
306                 t.Pieces = append(t.Pieces, piece)
307         }
308         for _, conn := range t.Conns {
309                 t.initRequestOrdering(conn)
310                 if err := conn.setNumPieces(t.numPieces()); err != nil {
311                         log.Printf("closing connection: %s", err)
312                         conn.Close()
313                 }
314         }
315         return
316 }
317
318 func (t *torrent) setStorage(td data.Data) (err error) {
319         if c, ok := t.data.(io.Closer); ok {
320                 c.Close()
321         }
322         if sd, ok := td.(StatefulData); ok {
323                 t.data = sd
324         } else {
325                 t.data = &statelessDataWrapper{td, make([]bool, t.Info.NumPieces())}
326         }
327         return
328 }
329
330 func (t *torrent) haveAllMetadataPieces() bool {
331         if t.haveInfo() {
332                 return true
333         }
334         if t.metadataHave == nil {
335                 return false
336         }
337         for _, have := range t.metadataHave {
338                 if !have {
339                         return false
340                 }
341         }
342         return true
343 }
344
345 func (t *torrent) SetMetadataSize(bytes int64) {
346         if t.MetaData != nil {
347                 return
348         }
349         if bytes > 10000000 { // 10MB, pulled from my ass.
350                 return
351         }
352         t.MetaData = make([]byte, bytes)
353         t.metadataHave = make([]bool, (bytes+(1<<14)-1)/(1<<14))
354 }
355
356 // The current working name for the torrent. Either the name in the info dict,
357 // or a display name given such as by the dn value in a magnet link, or "".
358 func (t *torrent) Name() string {
359         if t.haveInfo() {
360                 return t.Info.Name
361         }
362         if t.DisplayName != "" {
363                 return t.DisplayName
364         }
365         return ""
366 }
367
368 func (t *torrent) pieceStatusChar(index int) byte {
369         p := t.Pieces[index]
370         switch {
371         case t.pieceComplete(index):
372                 return 'C'
373         case p.QueuedForHash:
374                 return 'Q'
375         case p.Hashing:
376                 return 'H'
377         case !p.EverHashed:
378                 return '?'
379         case t.piecePartiallyDownloaded(index):
380                 switch p.Priority {
381                 case piecePriorityNone:
382                         return 'F' // Forgotten
383                 default:
384                         return 'P'
385                 }
386         default:
387                 switch p.Priority {
388                 case piecePriorityNone:
389                         return 'z'
390                 case piecePriorityNow:
391                         return '!'
392                 case piecePriorityReadahead:
393                         return 'R'
394                 case piecePriorityNext:
395                         return 'N'
396                 default:
397                         return '.'
398                 }
399         }
400 }
401
402 func (t *torrent) metadataPieceSize(piece int) int {
403         return metadataPieceSize(len(t.MetaData), piece)
404 }
405
406 func (t *torrent) newMetadataExtensionMessage(c *connection, msgType int, piece int, data []byte) pp.Message {
407         d := map[string]int{
408                 "msg_type": msgType,
409                 "piece":    piece,
410         }
411         if data != nil {
412                 d["total_size"] = len(t.MetaData)
413         }
414         p, err := bencode.Marshal(d)
415         if err != nil {
416                 panic(err)
417         }
418         return pp.Message{
419                 Type:            pp.Extended,
420                 ExtendedID:      byte(c.PeerExtensionIDs["ut_metadata"]),
421                 ExtendedPayload: append(p, data...),
422         }
423 }
424
425 type PieceStatusCharSequence struct {
426         Char  byte
427         Count int
428 }
429
430 func (t *torrent) PieceStatusCharSequences() []PieceStatusCharSequence {
431         t.stateMu.Lock()
432         defer t.stateMu.Unlock()
433         return t.pieceStatusCharSequences()
434 }
435
436 // Returns the length of sequences of identical piece status chars.
437 func (t *torrent) pieceStatusCharSequences() (ret []PieceStatusCharSequence) {
438         var (
439                 char  byte
440                 count int
441         )
442         writeSequence := func() {
443                 ret = append(ret, PieceStatusCharSequence{char, count})
444         }
445         if len(t.Pieces) != 0 {
446                 char = t.pieceStatusChar(0)
447         }
448         for index := range t.Pieces {
449                 char1 := t.pieceStatusChar(index)
450                 if char1 == char {
451                         count++
452                 } else {
453                         writeSequence()
454                         char = char1
455                         count = 1
456                 }
457         }
458         if count != 0 {
459                 writeSequence()
460         }
461         return
462 }
463
464 func (t *torrent) WriteStatus(w io.Writer) {
465         fmt.Fprintf(w, "Infohash: %x\n", t.InfoHash)
466         fmt.Fprintf(w, "Piece length: %s\n", func() string {
467                 if t.haveInfo() {
468                         return fmt.Sprint(t.usualPieceSize())
469                 } else {
470                         return "?"
471                 }
472         }())
473         if t.haveInfo() {
474                 fmt.Fprint(w, "Pieces: ")
475                 for _, seq := range t.pieceStatusCharSequences() {
476                         fmt.Fprintf(w, "%d%c ", seq.Count, seq.Char)
477                 }
478                 fmt.Fprintln(w)
479         }
480         fmt.Fprintf(w, "Trackers: ")
481         for _, tier := range t.Trackers {
482                 for _, tr := range tier {
483                         fmt.Fprintf(w, "%q ", tr.String())
484                 }
485         }
486         fmt.Fprintf(w, "\n")
487         fmt.Fprintf(w, "Pending peers: %d\n", len(t.Peers))
488         fmt.Fprintf(w, "Half open: %d\n", len(t.HalfOpen))
489         fmt.Fprintf(w, "Active peers: %d\n", len(t.Conns))
490         sort.Sort(&worstConns{
491                 c: t.Conns,
492                 t: t,
493         })
494         for _, c := range t.Conns {
495                 c.WriteStatus(w, t)
496         }
497 }
498
499 func (t *torrent) String() string {
500         s := t.Name()
501         if s == "" {
502                 s = fmt.Sprintf("%x", t.InfoHash)
503         }
504         return s
505 }
506
507 func (t *torrent) haveInfo() bool {
508         return t.Info != nil
509 }
510
511 // TODO: Include URIs that weren't converted to tracker clients.
512 func (t *torrent) announceList() (al [][]string) {
513         for _, tier := range t.Trackers {
514                 var l []string
515                 for _, tr := range tier {
516                         l = append(l, tr.URL())
517                 }
518                 al = append(al, l)
519         }
520         return
521 }
522
523 func (t *torrent) MetaInfo() *metainfo.MetaInfo {
524         if t.MetaData == nil {
525                 panic("info bytes not set")
526         }
527         return &metainfo.MetaInfo{
528                 Info: metainfo.InfoEx{
529                         Info:  *t.Info,
530                         Bytes: t.MetaData,
531                 },
532                 CreationDate: time.Now().Unix(),
533                 Comment:      "dynamic metainfo from client",
534                 CreatedBy:    "go.torrent",
535                 AnnounceList: t.announceList(),
536         }
537 }
538
539 func (t *torrent) bytesLeft() (left int64) {
540         if !t.haveInfo() {
541                 return -1
542         }
543         for i := 0; i < t.numPieces(); i++ {
544                 left += int64(t.PieceNumPendingBytes(i))
545         }
546         return
547 }
548
549 func (t *torrent) piecePartiallyDownloaded(index int) bool {
550         return t.PieceNumPendingBytes(index) != t.PieceLength(index)
551 }
552
553 func numChunksForPiece(chunkSize int, pieceSize int) int {
554         return (pieceSize + chunkSize - 1) / chunkSize
555 }
556
557 func (t *torrent) usualPieceSize() int {
558         return int(t.Info.PieceLength)
559 }
560
561 func (t *torrent) lastPieceSize() int {
562         return int(t.PieceLength(t.numPieces() - 1))
563 }
564
565 func (t *torrent) numPieces() int {
566         return t.Info.NumPieces()
567 }
568
569 func (t *torrent) numPiecesCompleted() (num int) {
570         for i := range iter.N(t.Info.NumPieces()) {
571                 if t.pieceComplete(i) {
572                         num++
573                 }
574         }
575         return
576 }
577
578 func (t *torrent) Length() int64 {
579         return t.length
580 }
581
582 func (t *torrent) isClosed() bool {
583         select {
584         case <-t.closing:
585                 return true
586         default:
587                 return false
588         }
589 }
590
591 func (t *torrent) close() (err error) {
592         if t.isClosed() {
593                 return
594         }
595         t.ceaseNetworking()
596         close(t.closing)
597         if c, ok := t.data.(io.Closer); ok {
598                 c.Close()
599         }
600         for _, conn := range t.Conns {
601                 conn.Close()
602         }
603         return
604 }
605
606 // Return the request that would include the given offset into the torrent data.
607 func torrentOffsetRequest(torrentLength, pieceSize, chunkSize, offset int64) (
608         r request, ok bool) {
609         if offset < 0 || offset >= torrentLength {
610                 return
611         }
612         r.Index = pp.Integer(offset / pieceSize)
613         r.Begin = pp.Integer(offset % pieceSize / chunkSize * chunkSize)
614         left := torrentLength - int64(r.Index)*pieceSize - int64(r.Begin)
615         if chunkSize < left {
616                 r.Length = pp.Integer(chunkSize)
617         } else {
618                 r.Length = pp.Integer(left)
619         }
620         ok = true
621         return
622 }
623
624 func torrentRequestOffset(torrentLength, pieceSize int64, r request) (off int64) {
625         off = int64(r.Index)*pieceSize + int64(r.Begin)
626         if off < 0 || off >= torrentLength {
627                 panic("invalid request")
628         }
629         return
630 }
631
632 func (t *torrent) requestOffset(r request) int64 {
633         return torrentRequestOffset(t.Length(), int64(t.usualPieceSize()), r)
634 }
635
636 // Return the request that would include the given offset into the torrent data.
637 func (t *torrent) offsetRequest(off int64) (req request, ok bool) {
638         return torrentOffsetRequest(t.Length(), t.Info.PieceLength, chunkSize, off)
639 }
640
641 func (t *torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
642         _, err = t.data.WriteAt(data, int64(piece)*t.Info.PieceLength+begin)
643         return
644 }
645
646 func (t *torrent) bitfield() (bf []bool) {
647         for _, p := range t.Pieces {
648                 bf = append(bf, p.EverHashed && len(p.PendingChunkSpecs) == 0)
649         }
650         return
651 }
652
653 func (t *torrent) pieceChunks(piece int) (css []chunkSpec) {
654         css = make([]chunkSpec, 0, (t.PieceLength(piece)+chunkSize-1)/chunkSize)
655         var cs chunkSpec
656         for left := t.PieceLength(piece); left != 0; left -= cs.Length {
657                 cs.Length = left
658                 if cs.Length > chunkSize {
659                         cs.Length = chunkSize
660                 }
661                 css = append(css, cs)
662                 cs.Begin += cs.Length
663         }
664         return
665 }
666
667 func (t *torrent) pendAllChunkSpecs(index int) {
668         piece := t.Pieces[index]
669         if piece.PendingChunkSpecs == nil {
670                 piece.PendingChunkSpecs = make(
671                         map[chunkSpec]struct{},
672                         (t.PieceLength(index)+chunkSize-1)/chunkSize)
673         }
674         pcss := piece.PendingChunkSpecs
675         for _, cs := range t.pieceChunks(int(index)) {
676                 pcss[cs] = struct{}{}
677         }
678         return
679 }
680
681 type Peer struct {
682         Id     [20]byte
683         IP     net.IP
684         Port   int
685         Source peerSource
686 }
687
688 func (t *torrent) PieceLength(piece int) (len_ pp.Integer) {
689         if int(piece) == t.numPieces()-1 {
690                 len_ = pp.Integer(t.Length() % t.Info.PieceLength)
691         }
692         if len_ == 0 {
693                 len_ = pp.Integer(t.Info.PieceLength)
694         }
695         return
696 }
697
698 func (t *torrent) hashPiece(piece pp.Integer) (ps pieceSum) {
699         hash := pieceHash.New()
700         t.data.WriteSectionTo(hash, int64(piece)*t.Info.PieceLength, t.Info.PieceLength)
701         util.CopyExact(ps[:], hash.Sum(nil))
702         return
703 }
704
705 func (t *torrent) haveAllPieces() bool {
706         if !t.haveInfo() {
707                 return false
708         }
709         for i := range t.Pieces {
710                 if !t.pieceComplete(i) {
711                         return false
712                 }
713         }
714         return true
715 }
716
717 func (me *torrent) haveAnyPieces() bool {
718         for i := range me.Pieces {
719                 if me.pieceComplete(i) {
720                         return true
721                 }
722         }
723         return false
724 }
725
726 func (t *torrent) havePiece(index int) bool {
727         return t.haveInfo() && t.pieceComplete(index)
728 }
729
730 func (t *torrent) haveChunk(r request) bool {
731         p := t.Pieces[r.Index]
732         if !p.EverHashed {
733                 return false
734         }
735         _, ok := p.PendingChunkSpecs[r.chunkSpec]
736         return !ok
737 }
738
739 func (t *torrent) wantChunk(r request) bool {
740         if !t.wantPiece(int(r.Index)) {
741                 return false
742         }
743         _, ok := t.Pieces[r.Index].PendingChunkSpecs[r.chunkSpec]
744         return ok
745 }
746
747 func (t *torrent) wantPiece(index int) bool {
748         if !t.haveInfo() {
749                 return false
750         }
751         p := t.Pieces[index]
752         return !t.pieceComplete(index) && p.Priority != piecePriorityNone && !p.QueuedForHash && !p.Hashing
753 }
754
755 func (t *torrent) connHasWantedPieces(c *connection) bool {
756         for p := range t.Pieces {
757                 if t.wantPiece(p) && c.PeerHasPiece(p) {
758                         return true
759                 }
760         }
761         return false
762 }
763
764 func (t *torrent) extentPieces(off, _len int64) (pieces []int) {
765         for i := off / int64(t.usualPieceSize()); i*int64(t.usualPieceSize()) < off+_len; i++ {
766                 pieces = append(pieces, int(i))
767         }
768         return
769 }