]> Sergey Matveev's repositories - btrtrc.git/blob - torrent.go
e32175e9429a4ab74159334aa95e326cc92da89c
[btrtrc.git] / torrent.go
1 package torrent
2
3 import (
4         "container/heap"
5         "fmt"
6         "io"
7         "log"
8         "net"
9         "sort"
10         "sync"
11         "time"
12
13         "github.com/anacrolix/missinggo"
14         "github.com/bradfitz/iter"
15
16         "github.com/anacrolix/torrent/bencode"
17         "github.com/anacrolix/torrent/data"
18         "github.com/anacrolix/torrent/metainfo"
19         pp "github.com/anacrolix/torrent/peer_protocol"
20         "github.com/anacrolix/torrent/tracker"
21         "github.com/anacrolix/torrent/util"
22 )
23
24 func (t *torrent) pieceNumPendingBytes(index int) (count pp.Integer) {
25         if t.pieceComplete(index) {
26                 return 0
27         }
28         piece := t.Pieces[index]
29         pieceLength := t.pieceLength(index)
30         if !piece.EverHashed {
31                 return pieceLength
32         }
33         for i, pending := range piece.PendingChunkSpecs {
34                 if pending {
35                         count += chunkIndexSpec(i, pieceLength).Length
36                 }
37         }
38         return
39 }
40
41 type peersKey struct {
42         IPBytes string
43         Port    int
44 }
45
46 // Data maintains per-piece persistent state.
47 type StatefulData interface {
48         data.Data
49         // We believe the piece data will pass a hash check.
50         PieceCompleted(index int) error
51         // Returns true if the piece is complete.
52         PieceComplete(index int) bool
53 }
54
55 // Is not aware of Client. Maintains state of torrent for with-in a Client.
56 type torrent struct {
57         stateMu sync.Mutex
58         closing chan struct{}
59
60         // Closed when no more network activity is desired. This includes
61         // announcing, and communicating with peers.
62         ceasingNetworking chan struct{}
63
64         InfoHash InfoHash
65         Pieces   []*piece
66         // Chunks that are wanted before all others. This is for
67         // responsive/streaming readers that want to unblock ASAP.
68         urgent map[request]struct{}
69         // Total length of the torrent in bytes. Stored because it's not O(1) to
70         // get this from the info dict.
71         length int64
72
73         data StatefulData
74
75         // The info dict. Nil if we don't have it (yet).
76         Info *metainfo.Info
77         // Active peer connections, running message stream loops.
78         Conns []*connection
79         // Set of addrs to which we're attempting to connect. Connections are
80         // half-open until all handshakes are completed.
81         HalfOpen map[string]struct{}
82
83         // Reserve of peers to connect to. A peer can be both here and in the
84         // active connections if were told about the peer after connecting with
85         // them. That encourages us to reconnect to peers that are well known.
86         Peers     map[peersKey]Peer
87         wantPeers sync.Cond
88
89         // BEP 12 Multitracker Metadata Extension. The tracker.Client instances
90         // mirror their respective URLs from the announce-list metainfo key.
91         Trackers [][]tracker.Client
92         // Name used if the info name isn't available.
93         DisplayName string
94         // The bencoded bytes of the info dict.
95         MetaData []byte
96         // Each element corresponds to the 16KiB metadata pieces. If true, we have
97         // received that piece.
98         metadataHave []bool
99
100         // Closed when .Info is set.
101         gotMetainfo chan struct{}
102
103         pruneTimer *time.Timer
104 }
105
106 func (t *torrent) pieceComplete(piece int) bool {
107         // TODO: This is called when setting metadata, and before storage is
108         // assigned, which doesn't seem right.
109         return t.data != nil && t.data.PieceComplete(piece)
110 }
111
112 func (t *torrent) numConnsUnchoked() (num int) {
113         for _, c := range t.Conns {
114                 if !c.PeerChoked {
115                         num++
116                 }
117         }
118         return
119 }
120
121 // There's a connection to that address already.
122 func (t *torrent) addrActive(addr string) bool {
123         if _, ok := t.HalfOpen[addr]; ok {
124                 return true
125         }
126         for _, c := range t.Conns {
127                 if c.remoteAddr().String() == addr {
128                         return true
129                 }
130         }
131         return false
132 }
133
134 func (t *torrent) worstConnsHeap() (wcs *worstConns) {
135         wcs = &worstConns{
136                 c: append([]*connection{}, t.Conns...),
137                 t: t,
138         }
139         heap.Init(wcs)
140         return
141 }
142
143 func (t *torrent) ceaseNetworking() {
144         t.stateMu.Lock()
145         defer t.stateMu.Unlock()
146         select {
147         case <-t.ceasingNetworking:
148                 return
149         default:
150         }
151         close(t.ceasingNetworking)
152         for _, c := range t.Conns {
153                 c.Close()
154         }
155         if t.pruneTimer != nil {
156                 t.pruneTimer.Stop()
157         }
158 }
159
160 func (t *torrent) addPeer(p Peer) {
161         t.Peers[peersKey{string(p.IP), p.Port}] = p
162 }
163
164 func (t *torrent) invalidateMetadata() {
165         t.MetaData = nil
166         t.metadataHave = nil
167         t.Info = nil
168 }
169
170 func (t *torrent) saveMetadataPiece(index int, data []byte) {
171         if t.haveInfo() {
172                 return
173         }
174         if index >= len(t.metadataHave) {
175                 log.Printf("%s: ignoring metadata piece %d", t, index)
176                 return
177         }
178         copy(t.MetaData[(1<<14)*index:], data)
179         t.metadataHave[index] = true
180 }
181
182 func (t *torrent) metadataPieceCount() int {
183         return (len(t.MetaData) + (1 << 14) - 1) / (1 << 14)
184 }
185
186 func (t *torrent) haveMetadataPiece(piece int) bool {
187         if t.haveInfo() {
188                 return (1<<14)*piece < len(t.MetaData)
189         } else {
190                 return piece < len(t.metadataHave) && t.metadataHave[piece]
191         }
192 }
193
194 func (t *torrent) metadataSizeKnown() bool {
195         return t.MetaData != nil
196 }
197
198 func (t *torrent) metadataSize() int {
199         return len(t.MetaData)
200 }
201
202 func infoPieceHashes(info *metainfo.Info) (ret []string) {
203         for i := 0; i < len(info.Pieces); i += 20 {
204                 ret = append(ret, string(info.Pieces[i:i+20]))
205         }
206         return
207 }
208
209 // Called when metadata for a torrent becomes available.
210 func (t *torrent) setMetadata(md *metainfo.Info, infoBytes []byte, eventLocker sync.Locker) (err error) {
211         err = validateInfo(md)
212         if err != nil {
213                 err = fmt.Errorf("bad info: %s", err)
214                 return
215         }
216         t.Info = md
217         t.length = 0
218         for _, f := range t.Info.UpvertedFiles() {
219                 t.length += f.Length
220         }
221         t.MetaData = infoBytes
222         t.metadataHave = nil
223         for _, hash := range infoPieceHashes(md) {
224                 piece := &piece{}
225                 piece.Event.L = eventLocker
226                 util.CopyExact(piece.Hash[:], hash)
227                 t.Pieces = append(t.Pieces, piece)
228         }
229         for _, conn := range t.Conns {
230                 t.initRequestOrdering(conn)
231                 if err := conn.setNumPieces(t.numPieces()); err != nil {
232                         log.Printf("closing connection: %s", err)
233                         conn.Close()
234                 }
235         }
236         return
237 }
238
239 func (t *torrent) setStorage(td data.Data) (err error) {
240         if c, ok := t.data.(io.Closer); ok {
241                 c.Close()
242         }
243         if sd, ok := td.(StatefulData); ok {
244                 t.data = sd
245         } else {
246                 t.data = &statelessDataWrapper{td, make([]bool, t.Info.NumPieces())}
247         }
248         return
249 }
250
251 func (t *torrent) haveAllMetadataPieces() bool {
252         if t.haveInfo() {
253                 return true
254         }
255         if t.metadataHave == nil {
256                 return false
257         }
258         for _, have := range t.metadataHave {
259                 if !have {
260                         return false
261                 }
262         }
263         return true
264 }
265
266 func (t *torrent) setMetadataSize(bytes int64, cl *Client) {
267         if t.haveInfo() {
268                 // We already know the correct metadata size.
269                 return
270         }
271         if bytes <= 0 || bytes > 10000000 { // 10MB, pulled from my ass.
272                 log.Printf("received bad metadata size: %d", bytes)
273                 return
274         }
275         if t.MetaData != nil && len(t.MetaData) == int(bytes) {
276                 return
277         }
278         t.MetaData = make([]byte, bytes)
279         t.metadataHave = make([]bool, (bytes+(1<<14)-1)/(1<<14))
280         for _, c := range t.Conns {
281                 cl.requestPendingMetadata(t, c)
282         }
283
284 }
285
286 // The current working name for the torrent. Either the name in the info dict,
287 // or a display name given such as by the dn value in a magnet link, or "".
288 func (t *torrent) Name() string {
289         if t.haveInfo() {
290                 return t.Info.Name
291         }
292         if t.DisplayName != "" {
293                 return t.DisplayName
294         }
295         return ""
296 }
297
298 func (t *torrent) pieceState(index int) (ret PieceState) {
299         p := t.Pieces[index]
300         ret.Priority = p.Priority
301         if t.pieceComplete(index) {
302                 ret.Complete = true
303         }
304         if p.QueuedForHash || p.Hashing {
305                 ret.Checking = true
306         }
307         if t.piecePartiallyDownloaded(index) {
308                 ret.Partial = true
309         }
310         return
311 }
312
313 func (t *torrent) metadataPieceSize(piece int) int {
314         return metadataPieceSize(len(t.MetaData), piece)
315 }
316
317 func (t *torrent) newMetadataExtensionMessage(c *connection, msgType int, piece int, data []byte) pp.Message {
318         d := map[string]int{
319                 "msg_type": msgType,
320                 "piece":    piece,
321         }
322         if data != nil {
323                 d["total_size"] = len(t.MetaData)
324         }
325         p, err := bencode.Marshal(d)
326         if err != nil {
327                 panic(err)
328         }
329         return pp.Message{
330                 Type:            pp.Extended,
331                 ExtendedID:      byte(c.PeerExtensionIDs["ut_metadata"]),
332                 ExtendedPayload: append(p, data...),
333         }
334 }
335
336 func (t *torrent) pieceStateRuns() (ret []PieceStateRun) {
337         rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
338                 ret = append(ret, PieceStateRun{
339                         PieceState: el.(PieceState),
340                         Length:     int(count),
341                 })
342         })
343         for index := range t.Pieces {
344                 rle.Append(t.pieceState(index), 1)
345         }
346         rle.Flush()
347         return
348 }
349
350 // Produces a small string representing a PieceStateRun.
351 func pieceStateRunStatusChars(psr PieceStateRun) (ret string) {
352         ret = fmt.Sprintf("%d", psr.Length)
353         ret += func() string {
354                 switch psr.Priority {
355                 case PiecePriorityNext:
356                         return "N"
357                 case PiecePriorityNormal:
358                         return "."
359                 case PiecePriorityReadahead:
360                         return "R"
361                 case PiecePriorityNow:
362                         return "!"
363                 default:
364                         return ""
365                 }
366         }()
367         if psr.Checking {
368                 ret += "H"
369         }
370         if psr.Partial {
371                 ret += "P"
372         }
373         if psr.Complete {
374                 ret += "C"
375         }
376         return
377 }
378
379 func (t *torrent) writeStatus(w io.Writer) {
380         fmt.Fprintf(w, "Infohash: %x\n", t.InfoHash)
381         fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
382         fmt.Fprintf(w, "Metadata have: ")
383         for _, h := range t.metadataHave {
384                 fmt.Fprintf(w, "%c", func() rune {
385                         if h {
386                                 return 'H'
387                         } else {
388                                 return '.'
389                         }
390                 }())
391         }
392         fmt.Fprintln(w)
393         fmt.Fprintf(w, "Piece length: %s\n", func() string {
394                 if t.haveInfo() {
395                         return fmt.Sprint(t.usualPieceSize())
396                 } else {
397                         return "?"
398                 }
399         }())
400         if t.haveInfo() {
401                 fmt.Fprint(w, "Pieces:")
402                 for _, psr := range t.pieceStateRuns() {
403                         w.Write([]byte(" "))
404                         w.Write([]byte(pieceStateRunStatusChars(psr)))
405                 }
406                 fmt.Fprintln(w)
407         }
408         fmt.Fprintf(w, "Urgent:")
409         for req := range t.urgent {
410                 fmt.Fprintf(w, " %v", req)
411         }
412         fmt.Fprintln(w)
413         fmt.Fprintf(w, "Trackers: ")
414         for _, tier := range t.Trackers {
415                 for _, tr := range tier {
416                         fmt.Fprintf(w, "%q ", tr.String())
417                 }
418         }
419         fmt.Fprintf(w, "\n")
420         fmt.Fprintf(w, "Pending peers: %d\n", len(t.Peers))
421         fmt.Fprintf(w, "Half open: %d\n", len(t.HalfOpen))
422         fmt.Fprintf(w, "Active peers: %d\n", len(t.Conns))
423         sort.Sort(&worstConns{
424                 c: t.Conns,
425                 t: t,
426         })
427         for _, c := range t.Conns {
428                 c.WriteStatus(w, t)
429         }
430 }
431
432 func (t *torrent) String() string {
433         s := t.Name()
434         if s == "" {
435                 s = fmt.Sprintf("%x", t.InfoHash)
436         }
437         return s
438 }
439
440 func (t *torrent) haveInfo() bool {
441         return t.Info != nil
442 }
443
444 // TODO: Include URIs that weren't converted to tracker clients.
445 func (t *torrent) announceList() (al [][]string) {
446         for _, tier := range t.Trackers {
447                 var l []string
448                 for _, tr := range tier {
449                         l = append(l, tr.URL())
450                 }
451                 al = append(al, l)
452         }
453         return
454 }
455
456 // Returns a run-time generated MetaInfo that includes the info bytes and
457 // announce-list as currently known to the client.
458 func (t *torrent) MetaInfo() *metainfo.MetaInfo {
459         if t.MetaData == nil {
460                 panic("info bytes not set")
461         }
462         return &metainfo.MetaInfo{
463                 Info: metainfo.InfoEx{
464                         Info:  *t.Info,
465                         Bytes: t.MetaData,
466                 },
467                 CreationDate: time.Now().Unix(),
468                 Comment:      "dynamic metainfo from client",
469                 CreatedBy:    "go.torrent",
470                 AnnounceList: t.announceList(),
471         }
472 }
473
474 func (t *torrent) bytesLeft() (left int64) {
475         if !t.haveInfo() {
476                 return -1
477         }
478         for i := 0; i < t.numPieces(); i++ {
479                 left += int64(t.pieceNumPendingBytes(i))
480         }
481         return
482 }
483
484 func (t *torrent) piecePartiallyDownloaded(index int) bool {
485         pendingBytes := t.pieceNumPendingBytes(index)
486         return pendingBytes != 0 && pendingBytes != t.pieceLength(index)
487 }
488
489 func numChunksForPiece(chunkSize int, pieceSize int) int {
490         return (pieceSize + chunkSize - 1) / chunkSize
491 }
492
493 func (t *torrent) usualPieceSize() int {
494         return int(t.Info.PieceLength)
495 }
496
497 func (t *torrent) lastPieceSize() int {
498         return int(t.pieceLength(t.numPieces() - 1))
499 }
500
501 func (t *torrent) numPieces() int {
502         return t.Info.NumPieces()
503 }
504
505 func (t *torrent) numPiecesCompleted() (num int) {
506         for i := range iter.N(t.Info.NumPieces()) {
507                 if t.pieceComplete(i) {
508                         num++
509                 }
510         }
511         return
512 }
513
514 func (t *torrent) Length() int64 {
515         return t.length
516 }
517
518 func (t *torrent) isClosed() bool {
519         select {
520         case <-t.closing:
521                 return true
522         default:
523                 return false
524         }
525 }
526
527 func (t *torrent) close() (err error) {
528         if t.isClosed() {
529                 return
530         }
531         t.ceaseNetworking()
532         close(t.closing)
533         if c, ok := t.data.(io.Closer); ok {
534                 c.Close()
535         }
536         for _, conn := range t.Conns {
537                 conn.Close()
538         }
539         return
540 }
541
542 func (t *torrent) requestOffset(r request) int64 {
543         return torrentRequestOffset(t.Length(), int64(t.usualPieceSize()), r)
544 }
545
546 // Return the request that would include the given offset into the torrent
547 // data. Returns !ok if there is no such request.
548 func (t *torrent) offsetRequest(off int64) (req request, ok bool) {
549         return torrentOffsetRequest(t.Length(), t.Info.PieceLength, chunkSize, off)
550 }
551
552 func (t *torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
553         n, err := t.data.WriteAt(data, int64(piece)*t.Info.PieceLength+begin)
554         if err == nil && n != len(data) {
555                 err = io.ErrShortWrite
556         }
557         return
558 }
559
560 func (t *torrent) bitfield() (bf []bool) {
561         for _, p := range t.Pieces {
562                 // TODO: Check this logic.
563                 bf = append(bf, p.EverHashed && p.numPendingChunks() == 0)
564         }
565         return
566 }
567
568 func (t *torrent) validOutgoingRequest(r request) bool {
569         if r.Index >= pp.Integer(t.Info.NumPieces()) {
570                 return false
571         }
572         if r.Begin%chunkSize != 0 {
573                 return false
574         }
575         if r.Length > chunkSize {
576                 return false
577         }
578         pieceLength := t.pieceLength(int(r.Index))
579         if r.Begin+r.Length > pieceLength {
580                 return false
581         }
582         return r.Length == chunkSize || r.Begin+r.Length == pieceLength
583 }
584
585 func (t *torrent) pieceChunks(piece int) (css []chunkSpec) {
586         css = make([]chunkSpec, 0, (t.pieceLength(piece)+chunkSize-1)/chunkSize)
587         var cs chunkSpec
588         for left := t.pieceLength(piece); left != 0; left -= cs.Length {
589                 cs.Length = left
590                 if cs.Length > chunkSize {
591                         cs.Length = chunkSize
592                 }
593                 css = append(css, cs)
594                 cs.Begin += cs.Length
595         }
596         return
597 }
598
599 func (t *torrent) pendAllChunkSpecs(pieceIndex int) {
600         piece := t.Pieces[pieceIndex]
601         if piece.PendingChunkSpecs == nil {
602                 // Allocate to exact size.
603                 piece.PendingChunkSpecs = make([]bool, (t.pieceLength(pieceIndex)+chunkSize-1)/chunkSize)
604         }
605         // Pend all the chunks.
606         pcss := piece.PendingChunkSpecs
607         for i := range pcss {
608                 pcss[i] = true
609         }
610         return
611 }
612
613 type Peer struct {
614         Id     [20]byte
615         IP     net.IP
616         Port   int
617         Source peerSource
618         // Peer is known to support encryption.
619         SupportsEncryption bool
620 }
621
622 func (t *torrent) pieceLength(piece int) (len_ pp.Integer) {
623         if int(piece) == t.numPieces()-1 {
624                 len_ = pp.Integer(t.Length() % t.Info.PieceLength)
625         }
626         if len_ == 0 {
627                 len_ = pp.Integer(t.Info.PieceLength)
628         }
629         return
630 }
631
632 func (t *torrent) hashPiece(piece pp.Integer) (ps pieceSum) {
633         hash := pieceHash.New()
634         t.data.WriteSectionTo(hash, int64(piece)*t.Info.PieceLength, t.Info.PieceLength)
635         util.CopyExact(ps[:], hash.Sum(nil))
636         return
637 }
638
639 func (t *torrent) haveAllPieces() bool {
640         if !t.haveInfo() {
641                 return false
642         }
643         for i := range t.Pieces {
644                 if !t.pieceComplete(i) {
645                         return false
646                 }
647         }
648         return true
649 }
650
651 func (me *torrent) haveAnyPieces() bool {
652         for i := range me.Pieces {
653                 if me.pieceComplete(i) {
654                         return true
655                 }
656         }
657         return false
658 }
659
660 func (t *torrent) havePiece(index int) bool {
661         return t.haveInfo() && t.pieceComplete(index)
662 }
663
664 func (t *torrent) haveChunk(r request) bool {
665         if !t.haveInfo() {
666                 return false
667         }
668         return !t.Pieces[r.Index].pendingChunk(r.chunkSpec)
669 }
670
671 func chunkIndex(cs chunkSpec) int {
672         return int(cs.Begin / chunkSize)
673 }
674
675 // TODO: This should probably be called wantPiece.
676 func (t *torrent) wantChunk(r request) bool {
677         if !t.wantPiece(int(r.Index)) {
678                 return false
679         }
680         if t.Pieces[r.Index].pendingChunk(r.chunkSpec) {
681                 return true
682         }
683         _, ok := t.urgent[r]
684         return ok
685 }
686
687 func (t *torrent) urgentChunkInPiece(piece int) bool {
688         for req := range t.urgent {
689                 if int(req.Index) == piece {
690                         return true
691                 }
692         }
693         return false
694 }
695
696 // TODO: This should be called wantPieceIndex.
697 func (t *torrent) wantPiece(index int) bool {
698         if !t.haveInfo() {
699                 return false
700         }
701         p := t.Pieces[index]
702         if p.QueuedForHash {
703                 return false
704         }
705         if p.Hashing {
706                 return false
707         }
708         if p.Priority == PiecePriorityNone {
709                 if !t.urgentChunkInPiece(index) {
710                         return false
711                 }
712         }
713         // Put piece complete check last, since it's the slowest as it can involve
714         // calling out into external data stores.
715         return !t.pieceComplete(index)
716 }
717
718 func (t *torrent) connHasWantedPieces(c *connection) bool {
719         return c.pieceRequestOrder != nil && c.pieceRequestOrder.First() != nil
720 }
721
722 func (t *torrent) extentPieces(off, _len int64) (pieces []int) {
723         for i := off / int64(t.usualPieceSize()); i*int64(t.usualPieceSize()) < off+_len; i++ {
724                 pieces = append(pieces, int(i))
725         }
726         return
727 }