]> Sergey Matveev's repositories - btrtrc.git/blob - torrent.go
50e2ed91c73f0566c18c48ed111757a3e9f6c861
[btrtrc.git] / torrent.go
1 package torrent
2
3 import (
4         "container/heap"
5         "fmt"
6         "io"
7         "log"
8         "net"
9         "sort"
10         "sync"
11         "time"
12
13         "github.com/anacrolix/missinggo"
14         "github.com/bradfitz/iter"
15
16         "github.com/anacrolix/torrent/bencode"
17         "github.com/anacrolix/torrent/data"
18         "github.com/anacrolix/torrent/metainfo"
19         pp "github.com/anacrolix/torrent/peer_protocol"
20         "github.com/anacrolix/torrent/tracker"
21         "github.com/anacrolix/torrent/util"
22 )
23
24 func (t *torrent) pieceNumPendingBytes(index int) (count pp.Integer) {
25         if t.pieceComplete(index) {
26                 return 0
27         }
28         piece := t.Pieces[index]
29         pieceLength := t.pieceLength(index)
30         if !piece.EverHashed {
31                 return pieceLength
32         }
33         for i, pending := range piece.PendingChunkSpecs {
34                 if pending {
35                         count += chunkIndexSpec(i, pieceLength).Length
36                 }
37         }
38         return
39 }
40
41 type peersKey struct {
42         IPBytes string
43         Port    int
44 }
45
46 // Data maintains per-piece persistent state.
47 type StatefulData interface {
48         data.Data
49         // We believe the piece data will pass a hash check.
50         PieceCompleted(index int) error
51         // Returns true if the piece is complete.
52         PieceComplete(index int) bool
53 }
54
55 // Is not aware of Client. Maintains state of torrent for with-in a Client.
56 type torrent struct {
57         stateMu sync.Mutex
58         closing chan struct{}
59
60         // Closed when no more network activity is desired. This includes
61         // announcing, and communicating with peers.
62         ceasingNetworking chan struct{}
63
64         InfoHash InfoHash
65         Pieces   []*piece
66         // Chunks that are wanted before all others. This is for
67         // responsive/streaming readers that want to unblock ASAP.
68         urgent map[request]struct{}
69         // Total length of the torrent in bytes. Stored because it's not O(1) to
70         // get this from the info dict.
71         length int64
72
73         data StatefulData
74
75         // The info dict. Nil if we don't have it (yet).
76         Info *metainfo.Info
77         // Active peer connections, running message stream loops.
78         Conns []*connection
79         // Set of addrs to which we're attempting to connect. Connections are
80         // half-open until all handshakes are completed.
81         HalfOpen map[string]struct{}
82
83         // Reserve of peers to connect to. A peer can be both here and in the
84         // active connections if were told about the peer after connecting with
85         // them. That encourages us to reconnect to peers that are well known.
86         Peers     map[peersKey]Peer
87         wantPeers sync.Cond
88
89         // BEP 12 Multitracker Metadata Extension. The tracker.Client instances
90         // mirror their respective URLs from the announce-list metainfo key.
91         Trackers [][]tracker.Client
92         // Name used if the info name isn't available.
93         DisplayName string
94         // The bencoded bytes of the info dict.
95         MetaData []byte
96         // Each element corresponds to the 16KiB metadata pieces. If true, we have
97         // received that piece.
98         metadataHave []bool
99
100         // Closed when .Info is set.
101         gotMetainfo chan struct{}
102 }
103
104 func (t *torrent) pieceComplete(piece int) bool {
105         // TODO: This is called when setting metadata, and before storage is
106         // assigned, which doesn't seem right.
107         return t.data != nil && t.data.PieceComplete(piece)
108 }
109
110 func (t *torrent) numConnsUnchoked() (num int) {
111         for _, c := range t.Conns {
112                 if !c.PeerChoked {
113                         num++
114                 }
115         }
116         return
117 }
118
119 // There's a connection to that address already.
120 func (t *torrent) addrActive(addr string) bool {
121         if _, ok := t.HalfOpen[addr]; ok {
122                 return true
123         }
124         for _, c := range t.Conns {
125                 if c.remoteAddr().String() == addr {
126                         return true
127                 }
128         }
129         return false
130 }
131
132 func (t *torrent) worstConns(cl *Client) (wcs *worstConns) {
133         wcs = &worstConns{
134                 c:  make([]*connection, 0, len(t.Conns)),
135                 t:  t,
136                 cl: cl,
137         }
138         for _, c := range t.Conns {
139                 select {
140                 case <-c.closing:
141                 default:
142                         wcs.c = append(wcs.c, c)
143                 }
144         }
145         return
146 }
147
148 func (t *torrent) ceaseNetworking() {
149         t.stateMu.Lock()
150         defer t.stateMu.Unlock()
151         select {
152         case <-t.ceasingNetworking:
153                 return
154         default:
155         }
156         close(t.ceasingNetworking)
157         for _, c := range t.Conns {
158                 c.Close()
159         }
160 }
161
162 func (t *torrent) addPeer(p Peer) {
163         t.Peers[peersKey{string(p.IP), p.Port}] = p
164 }
165
166 func (t *torrent) invalidateMetadata() {
167         t.MetaData = nil
168         t.metadataHave = nil
169         t.Info = nil
170 }
171
172 func (t *torrent) saveMetadataPiece(index int, data []byte) {
173         if t.haveInfo() {
174                 return
175         }
176         if index >= len(t.metadataHave) {
177                 log.Printf("%s: ignoring metadata piece %d", t, index)
178                 return
179         }
180         copy(t.MetaData[(1<<14)*index:], data)
181         t.metadataHave[index] = true
182 }
183
184 func (t *torrent) metadataPieceCount() int {
185         return (len(t.MetaData) + (1 << 14) - 1) / (1 << 14)
186 }
187
188 func (t *torrent) haveMetadataPiece(piece int) bool {
189         if t.haveInfo() {
190                 return (1<<14)*piece < len(t.MetaData)
191         } else {
192                 return piece < len(t.metadataHave) && t.metadataHave[piece]
193         }
194 }
195
196 func (t *torrent) metadataSizeKnown() bool {
197         return t.MetaData != nil
198 }
199
200 func (t *torrent) metadataSize() int {
201         return len(t.MetaData)
202 }
203
204 func infoPieceHashes(info *metainfo.Info) (ret []string) {
205         for i := 0; i < len(info.Pieces); i += 20 {
206                 ret = append(ret, string(info.Pieces[i:i+20]))
207         }
208         return
209 }
210
211 // Called when metadata for a torrent becomes available.
212 func (t *torrent) setMetadata(md *metainfo.Info, infoBytes []byte, eventLocker sync.Locker) (err error) {
213         err = validateInfo(md)
214         if err != nil {
215                 err = fmt.Errorf("bad info: %s", err)
216                 return
217         }
218         t.Info = md
219         t.length = 0
220         for _, f := range t.Info.UpvertedFiles() {
221                 t.length += f.Length
222         }
223         t.MetaData = infoBytes
224         t.metadataHave = nil
225         for _, hash := range infoPieceHashes(md) {
226                 piece := &piece{}
227                 piece.Event.L = eventLocker
228                 util.CopyExact(piece.Hash[:], hash)
229                 t.Pieces = append(t.Pieces, piece)
230         }
231         for _, conn := range t.Conns {
232                 t.initRequestOrdering(conn)
233                 if err := conn.setNumPieces(t.numPieces()); err != nil {
234                         log.Printf("closing connection: %s", err)
235                         conn.Close()
236                 }
237         }
238         return
239 }
240
241 func (t *torrent) setStorage(td data.Data) (err error) {
242         if c, ok := t.data.(io.Closer); ok {
243                 c.Close()
244         }
245         if sd, ok := td.(StatefulData); ok {
246                 t.data = sd
247         } else {
248                 t.data = &statelessDataWrapper{td, make([]bool, t.Info.NumPieces())}
249         }
250         return
251 }
252
253 func (t *torrent) haveAllMetadataPieces() bool {
254         if t.haveInfo() {
255                 return true
256         }
257         if t.metadataHave == nil {
258                 return false
259         }
260         for _, have := range t.metadataHave {
261                 if !have {
262                         return false
263                 }
264         }
265         return true
266 }
267
268 func (t *torrent) setMetadataSize(bytes int64, cl *Client) {
269         if t.haveInfo() {
270                 // We already know the correct metadata size.
271                 return
272         }
273         if bytes <= 0 || bytes > 10000000 { // 10MB, pulled from my ass.
274                 log.Printf("received bad metadata size: %d", bytes)
275                 return
276         }
277         if t.MetaData != nil && len(t.MetaData) == int(bytes) {
278                 return
279         }
280         t.MetaData = make([]byte, bytes)
281         t.metadataHave = make([]bool, (bytes+(1<<14)-1)/(1<<14))
282         for _, c := range t.Conns {
283                 cl.requestPendingMetadata(t, c)
284         }
285
286 }
287
288 // The current working name for the torrent. Either the name in the info dict,
289 // or a display name given such as by the dn value in a magnet link, or "".
290 func (t *torrent) Name() string {
291         if t.haveInfo() {
292                 return t.Info.Name
293         }
294         if t.DisplayName != "" {
295                 return t.DisplayName
296         }
297         return ""
298 }
299
300 func (t *torrent) pieceState(index int) (ret PieceState) {
301         p := t.Pieces[index]
302         ret.Priority = p.Priority
303         if t.pieceComplete(index) {
304                 ret.Complete = true
305         }
306         if p.QueuedForHash || p.Hashing {
307                 ret.Checking = true
308         }
309         if t.piecePartiallyDownloaded(index) {
310                 ret.Partial = true
311         }
312         return
313 }
314
315 func (t *torrent) metadataPieceSize(piece int) int {
316         return metadataPieceSize(len(t.MetaData), piece)
317 }
318
319 func (t *torrent) newMetadataExtensionMessage(c *connection, msgType int, piece int, data []byte) pp.Message {
320         d := map[string]int{
321                 "msg_type": msgType,
322                 "piece":    piece,
323         }
324         if data != nil {
325                 d["total_size"] = len(t.MetaData)
326         }
327         p, err := bencode.Marshal(d)
328         if err != nil {
329                 panic(err)
330         }
331         return pp.Message{
332                 Type:            pp.Extended,
333                 ExtendedID:      byte(c.PeerExtensionIDs["ut_metadata"]),
334                 ExtendedPayload: append(p, data...),
335         }
336 }
337
338 func (t *torrent) pieceStateRuns() (ret []PieceStateRun) {
339         rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
340                 ret = append(ret, PieceStateRun{
341                         PieceState: el.(PieceState),
342                         Length:     int(count),
343                 })
344         })
345         for index := range t.Pieces {
346                 rle.Append(t.pieceState(index), 1)
347         }
348         rle.Flush()
349         return
350 }
351
352 // Produces a small string representing a PieceStateRun.
353 func pieceStateRunStatusChars(psr PieceStateRun) (ret string) {
354         ret = fmt.Sprintf("%d", psr.Length)
355         ret += func() string {
356                 switch psr.Priority {
357                 case PiecePriorityNext:
358                         return "N"
359                 case PiecePriorityNormal:
360                         return "."
361                 case PiecePriorityReadahead:
362                         return "R"
363                 case PiecePriorityNow:
364                         return "!"
365                 default:
366                         return ""
367                 }
368         }()
369         if psr.Checking {
370                 ret += "H"
371         }
372         if psr.Partial {
373                 ret += "P"
374         }
375         if psr.Complete {
376                 ret += "C"
377         }
378         return
379 }
380
381 func (t *torrent) writeStatus(w io.Writer, cl *Client) {
382         fmt.Fprintf(w, "Infohash: %x\n", t.InfoHash)
383         fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
384         if !t.haveInfo() {
385                 fmt.Fprintf(w, "Metadata have: ")
386                 for _, h := range t.metadataHave {
387                         fmt.Fprintf(w, "%c", func() rune {
388                                 if h {
389                                         return 'H'
390                                 } else {
391                                         return '.'
392                                 }
393                         }())
394                 }
395                 fmt.Fprintln(w)
396         }
397         fmt.Fprintf(w, "Piece length: %s\n", func() string {
398                 if t.haveInfo() {
399                         return fmt.Sprint(t.usualPieceSize())
400                 } else {
401                         return "?"
402                 }
403         }())
404         if t.haveInfo() {
405                 fmt.Fprint(w, "Pieces:")
406                 for _, psr := range t.pieceStateRuns() {
407                         w.Write([]byte(" "))
408                         w.Write([]byte(pieceStateRunStatusChars(psr)))
409                 }
410                 fmt.Fprintln(w)
411         }
412         fmt.Fprintf(w, "Urgent:")
413         for req := range t.urgent {
414                 fmt.Fprintf(w, " %v", req)
415         }
416         fmt.Fprintln(w)
417         fmt.Fprintf(w, "Trackers: ")
418         for _, tier := range t.Trackers {
419                 for _, tr := range tier {
420                         fmt.Fprintf(w, "%q ", tr.String())
421                 }
422         }
423         fmt.Fprintf(w, "\n")
424         fmt.Fprintf(w, "Pending peers: %d\n", len(t.Peers))
425         fmt.Fprintf(w, "Half open: %d\n", len(t.HalfOpen))
426         fmt.Fprintf(w, "Active peers: %d\n", len(t.Conns))
427         sort.Sort(&worstConns{
428                 c:  t.Conns,
429                 t:  t,
430                 cl: cl,
431         })
432         for i, c := range t.Conns {
433                 fmt.Fprintf(w, "%2d. ", i+1)
434                 c.WriteStatus(w, t)
435         }
436 }
437
438 func (t *torrent) String() string {
439         s := t.Name()
440         if s == "" {
441                 s = fmt.Sprintf("%x", t.InfoHash)
442         }
443         return s
444 }
445
446 func (t *torrent) haveInfo() bool {
447         return t != nil && t.Info != nil
448 }
449
450 // TODO: Include URIs that weren't converted to tracker clients.
451 func (t *torrent) announceList() (al [][]string) {
452         for _, tier := range t.Trackers {
453                 var l []string
454                 for _, tr := range tier {
455                         l = append(l, tr.URL())
456                 }
457                 al = append(al, l)
458         }
459         return
460 }
461
462 // Returns a run-time generated MetaInfo that includes the info bytes and
463 // announce-list as currently known to the client.
464 func (t *torrent) MetaInfo() *metainfo.MetaInfo {
465         if t.MetaData == nil {
466                 panic("info bytes not set")
467         }
468         return &metainfo.MetaInfo{
469                 Info: metainfo.InfoEx{
470                         Info:  *t.Info,
471                         Bytes: t.MetaData,
472                 },
473                 CreationDate: time.Now().Unix(),
474                 Comment:      "dynamic metainfo from client",
475                 CreatedBy:    "go.torrent",
476                 AnnounceList: t.announceList(),
477         }
478 }
479
480 func (t *torrent) bytesLeft() (left int64) {
481         if !t.haveInfo() {
482                 return -1
483         }
484         for i := 0; i < t.numPieces(); i++ {
485                 left += int64(t.pieceNumPendingBytes(i))
486         }
487         return
488 }
489
490 func (t *torrent) piecePartiallyDownloaded(index int) bool {
491         pendingBytes := t.pieceNumPendingBytes(index)
492         return pendingBytes != 0 && pendingBytes != t.pieceLength(index)
493 }
494
495 func numChunksForPiece(chunkSize int, pieceSize int) int {
496         return (pieceSize + chunkSize - 1) / chunkSize
497 }
498
499 func (t *torrent) usualPieceSize() int {
500         return int(t.Info.PieceLength)
501 }
502
503 func (t *torrent) lastPieceSize() int {
504         return int(t.pieceLength(t.numPieces() - 1))
505 }
506
507 func (t *torrent) numPieces() int {
508         return t.Info.NumPieces()
509 }
510
511 func (t *torrent) numPiecesCompleted() (num int) {
512         for i := range iter.N(t.Info.NumPieces()) {
513                 if t.pieceComplete(i) {
514                         num++
515                 }
516         }
517         return
518 }
519
520 func (t *torrent) Length() int64 {
521         return t.length
522 }
523
524 func (t *torrent) isClosed() bool {
525         select {
526         case <-t.closing:
527                 return true
528         default:
529                 return false
530         }
531 }
532
533 func (t *torrent) close() (err error) {
534         if t.isClosed() {
535                 return
536         }
537         t.ceaseNetworking()
538         close(t.closing)
539         if c, ok := t.data.(io.Closer); ok {
540                 c.Close()
541         }
542         for _, conn := range t.Conns {
543                 conn.Close()
544         }
545         return
546 }
547
548 func (t *torrent) requestOffset(r request) int64 {
549         return torrentRequestOffset(t.Length(), int64(t.usualPieceSize()), r)
550 }
551
552 // Return the request that would include the given offset into the torrent
553 // data. Returns !ok if there is no such request.
554 func (t *torrent) offsetRequest(off int64) (req request, ok bool) {
555         return torrentOffsetRequest(t.Length(), t.Info.PieceLength, chunkSize, off)
556 }
557
558 func (t *torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
559         n, err := t.data.WriteAt(data, int64(piece)*t.Info.PieceLength+begin)
560         if err == nil && n != len(data) {
561                 err = io.ErrShortWrite
562         }
563         return
564 }
565
566 func (t *torrent) bitfield() (bf []bool) {
567         for _, p := range t.Pieces {
568                 // TODO: Check this logic.
569                 bf = append(bf, p.EverHashed && p.numPendingChunks() == 0)
570         }
571         return
572 }
573
574 func (t *torrent) validOutgoingRequest(r request) bool {
575         if r.Index >= pp.Integer(t.Info.NumPieces()) {
576                 return false
577         }
578         if r.Begin%chunkSize != 0 {
579                 return false
580         }
581         if r.Length > chunkSize {
582                 return false
583         }
584         pieceLength := t.pieceLength(int(r.Index))
585         if r.Begin+r.Length > pieceLength {
586                 return false
587         }
588         return r.Length == chunkSize || r.Begin+r.Length == pieceLength
589 }
590
591 func (t *torrent) pieceChunks(piece int) (css []chunkSpec) {
592         css = make([]chunkSpec, 0, (t.pieceLength(piece)+chunkSize-1)/chunkSize)
593         var cs chunkSpec
594         for left := t.pieceLength(piece); left != 0; left -= cs.Length {
595                 cs.Length = left
596                 if cs.Length > chunkSize {
597                         cs.Length = chunkSize
598                 }
599                 css = append(css, cs)
600                 cs.Begin += cs.Length
601         }
602         return
603 }
604
605 func (t *torrent) pendAllChunkSpecs(pieceIndex int) {
606         piece := t.Pieces[pieceIndex]
607         if piece.PendingChunkSpecs == nil {
608                 // Allocate to exact size.
609                 piece.PendingChunkSpecs = make([]bool, (t.pieceLength(pieceIndex)+chunkSize-1)/chunkSize)
610         }
611         // Pend all the chunks.
612         pcss := piece.PendingChunkSpecs
613         for i := range pcss {
614                 pcss[i] = true
615         }
616         return
617 }
618
619 type Peer struct {
620         Id     [20]byte
621         IP     net.IP
622         Port   int
623         Source peerSource
624         // Peer is known to support encryption.
625         SupportsEncryption bool
626 }
627
628 func (t *torrent) pieceLength(piece int) (len_ pp.Integer) {
629         if int(piece) == t.numPieces()-1 {
630                 len_ = pp.Integer(t.Length() % t.Info.PieceLength)
631         }
632         if len_ == 0 {
633                 len_ = pp.Integer(t.Info.PieceLength)
634         }
635         return
636 }
637
638 func (t *torrent) hashPiece(piece pp.Integer) (ps pieceSum) {
639         hash := pieceHash.New()
640         t.data.WriteSectionTo(hash, int64(piece)*t.Info.PieceLength, t.Info.PieceLength)
641         util.CopyExact(ps[:], hash.Sum(nil))
642         return
643 }
644
645 func (t *torrent) haveAllPieces() bool {
646         if !t.haveInfo() {
647                 return false
648         }
649         for i := range t.Pieces {
650                 if !t.pieceComplete(i) {
651                         return false
652                 }
653         }
654         return true
655 }
656
657 func (me *torrent) haveAnyPieces() bool {
658         for i := range me.Pieces {
659                 if me.pieceComplete(i) {
660                         return true
661                 }
662         }
663         return false
664 }
665
666 func (t *torrent) havePiece(index int) bool {
667         return t.haveInfo() && t.pieceComplete(index)
668 }
669
670 func (t *torrent) haveChunk(r request) bool {
671         if !t.haveInfo() {
672                 return false
673         }
674         return !t.Pieces[r.Index].pendingChunk(r.chunkSpec)
675 }
676
677 func chunkIndex(cs chunkSpec) int {
678         return int(cs.Begin / chunkSize)
679 }
680
681 // TODO: This should probably be called wantPiece.
682 func (t *torrent) wantChunk(r request) bool {
683         if !t.wantPiece(int(r.Index)) {
684                 return false
685         }
686         if t.Pieces[r.Index].pendingChunk(r.chunkSpec) {
687                 return true
688         }
689         _, ok := t.urgent[r]
690         return ok
691 }
692
693 func (t *torrent) urgentChunkInPiece(piece int) bool {
694         p := pp.Integer(piece)
695         for req := range t.urgent {
696                 if req.Index == p {
697                         return true
698                 }
699         }
700         return false
701 }
702
703 // TODO: This should be called wantPieceIndex.
704 func (t *torrent) wantPiece(index int) bool {
705         if !t.haveInfo() {
706                 return false
707         }
708         p := t.Pieces[index]
709         if p.QueuedForHash {
710                 return false
711         }
712         if p.Hashing {
713                 return false
714         }
715         if p.Priority == PiecePriorityNone {
716                 if !t.urgentChunkInPiece(index) {
717                         return false
718                 }
719         }
720         // Put piece complete check last, since it's the slowest as it can involve
721         // calling out into external data stores.
722         return !t.pieceComplete(index)
723 }
724
725 func (t *torrent) connHasWantedPieces(c *connection) bool {
726         return c.pieceRequestOrder != nil && c.pieceRequestOrder.First() != nil
727 }
728
729 func (t *torrent) extentPieces(off, _len int64) (pieces []int) {
730         for i := off / int64(t.usualPieceSize()); i*int64(t.usualPieceSize()) < off+_len; i++ {
731                 pieces = append(pieces, int(i))
732         }
733         return
734 }
735
736 func (t *torrent) worstBadConn(cl *Client) *connection {
737         wcs := t.worstConns(cl)
738         heap.Init(wcs)
739         // A connection can only be bad if it's in the worst half, rounded down.
740         for wcs.Len() > (socketsPerTorrent+1)/2 {
741                 c := heap.Pop(wcs).(*connection)
742                 // Give connections 1 minute to prove themselves.
743                 if time.Since(c.completedHandshake) < time.Minute {
744                         continue
745                 }
746                 return c
747         }
748         return nil
749 }