]> Sergey Matveev's repositories - btrtrc.git/blob - torrent.go
Replace pruning timer with as-required connection dropping
[btrtrc.git] / torrent.go
1 package torrent
2
3 import (
4         "container/heap"
5         "fmt"
6         "io"
7         "log"
8         "net"
9         "sort"
10         "sync"
11         "time"
12
13         "github.com/anacrolix/missinggo"
14         "github.com/bradfitz/iter"
15
16         "github.com/anacrolix/torrent/bencode"
17         "github.com/anacrolix/torrent/data"
18         "github.com/anacrolix/torrent/metainfo"
19         pp "github.com/anacrolix/torrent/peer_protocol"
20         "github.com/anacrolix/torrent/tracker"
21         "github.com/anacrolix/torrent/util"
22 )
23
24 func (t *torrent) pieceNumPendingBytes(index int) (count pp.Integer) {
25         if t.pieceComplete(index) {
26                 return 0
27         }
28         piece := t.Pieces[index]
29         pieceLength := t.pieceLength(index)
30         if !piece.EverHashed {
31                 return pieceLength
32         }
33         for i, pending := range piece.PendingChunkSpecs {
34                 if pending {
35                         count += chunkIndexSpec(i, pieceLength).Length
36                 }
37         }
38         return
39 }
40
41 type peersKey struct {
42         IPBytes string
43         Port    int
44 }
45
46 // Data maintains per-piece persistent state.
47 type StatefulData interface {
48         data.Data
49         // We believe the piece data will pass a hash check.
50         PieceCompleted(index int) error
51         // Returns true if the piece is complete.
52         PieceComplete(index int) bool
53 }
54
55 // Is not aware of Client. Maintains state of torrent for with-in a Client.
56 type torrent struct {
57         stateMu sync.Mutex
58         closing chan struct{}
59
60         // Closed when no more network activity is desired. This includes
61         // announcing, and communicating with peers.
62         ceasingNetworking chan struct{}
63
64         InfoHash InfoHash
65         Pieces   []*piece
66         // Chunks that are wanted before all others. This is for
67         // responsive/streaming readers that want to unblock ASAP.
68         urgent map[request]struct{}
69         // Total length of the torrent in bytes. Stored because it's not O(1) to
70         // get this from the info dict.
71         length int64
72
73         data StatefulData
74
75         // The info dict. Nil if we don't have it (yet).
76         Info *metainfo.Info
77         // Active peer connections, running message stream loops.
78         Conns []*connection
79         // Set of addrs to which we're attempting to connect. Connections are
80         // half-open until all handshakes are completed.
81         HalfOpen map[string]struct{}
82
83         // Reserve of peers to connect to. A peer can be both here and in the
84         // active connections if were told about the peer after connecting with
85         // them. That encourages us to reconnect to peers that are well known.
86         Peers     map[peersKey]Peer
87         wantPeers sync.Cond
88
89         // BEP 12 Multitracker Metadata Extension. The tracker.Client instances
90         // mirror their respective URLs from the announce-list metainfo key.
91         Trackers [][]tracker.Client
92         // Name used if the info name isn't available.
93         DisplayName string
94         // The bencoded bytes of the info dict.
95         MetaData []byte
96         // Each element corresponds to the 16KiB metadata pieces. If true, we have
97         // received that piece.
98         metadataHave []bool
99
100         // Closed when .Info is set.
101         gotMetainfo chan struct{}
102 }
103
104 func (t *torrent) pieceComplete(piece int) bool {
105         // TODO: This is called when setting metadata, and before storage is
106         // assigned, which doesn't seem right.
107         return t.data != nil && t.data.PieceComplete(piece)
108 }
109
110 func (t *torrent) numConnsUnchoked() (num int) {
111         for _, c := range t.Conns {
112                 if !c.PeerChoked {
113                         num++
114                 }
115         }
116         return
117 }
118
119 // There's a connection to that address already.
120 func (t *torrent) addrActive(addr string) bool {
121         if _, ok := t.HalfOpen[addr]; ok {
122                 return true
123         }
124         for _, c := range t.Conns {
125                 if c.remoteAddr().String() == addr {
126                         return true
127                 }
128         }
129         return false
130 }
131
132 func (t *torrent) worstConns(cl *Client) (wcs *worstConns) {
133         wcs = &worstConns{
134                 c:  make([]*connection, 0, len(t.Conns)),
135                 t:  t,
136                 cl: cl,
137         }
138         for _, c := range t.Conns {
139                 select {
140                 case <-c.closing:
141                 default:
142                         wcs.c = append(wcs.c, c)
143                 }
144         }
145         return
146 }
147
148 func (t *torrent) ceaseNetworking() {
149         t.stateMu.Lock()
150         defer t.stateMu.Unlock()
151         select {
152         case <-t.ceasingNetworking:
153                 return
154         default:
155         }
156         close(t.ceasingNetworking)
157         for _, c := range t.Conns {
158                 c.Close()
159         }
160 }
161
162 func (t *torrent) addPeer(p Peer) {
163         t.Peers[peersKey{string(p.IP), p.Port}] = p
164 }
165
166 func (t *torrent) invalidateMetadata() {
167         t.MetaData = nil
168         t.metadataHave = nil
169         t.Info = nil
170 }
171
172 func (t *torrent) saveMetadataPiece(index int, data []byte) {
173         if t.haveInfo() {
174                 return
175         }
176         if index >= len(t.metadataHave) {
177                 log.Printf("%s: ignoring metadata piece %d", t, index)
178                 return
179         }
180         copy(t.MetaData[(1<<14)*index:], data)
181         t.metadataHave[index] = true
182 }
183
184 func (t *torrent) metadataPieceCount() int {
185         return (len(t.MetaData) + (1 << 14) - 1) / (1 << 14)
186 }
187
188 func (t *torrent) haveMetadataPiece(piece int) bool {
189         if t.haveInfo() {
190                 return (1<<14)*piece < len(t.MetaData)
191         } else {
192                 return piece < len(t.metadataHave) && t.metadataHave[piece]
193         }
194 }
195
196 func (t *torrent) metadataSizeKnown() bool {
197         return t.MetaData != nil
198 }
199
200 func (t *torrent) metadataSize() int {
201         return len(t.MetaData)
202 }
203
204 func infoPieceHashes(info *metainfo.Info) (ret []string) {
205         for i := 0; i < len(info.Pieces); i += 20 {
206                 ret = append(ret, string(info.Pieces[i:i+20]))
207         }
208         return
209 }
210
211 // Called when metadata for a torrent becomes available.
212 func (t *torrent) setMetadata(md *metainfo.Info, infoBytes []byte, eventLocker sync.Locker) (err error) {
213         err = validateInfo(md)
214         if err != nil {
215                 err = fmt.Errorf("bad info: %s", err)
216                 return
217         }
218         t.Info = md
219         t.length = 0
220         for _, f := range t.Info.UpvertedFiles() {
221                 t.length += f.Length
222         }
223         t.MetaData = infoBytes
224         t.metadataHave = nil
225         for _, hash := range infoPieceHashes(md) {
226                 piece := &piece{}
227                 piece.Event.L = eventLocker
228                 util.CopyExact(piece.Hash[:], hash)
229                 t.Pieces = append(t.Pieces, piece)
230         }
231         for _, conn := range t.Conns {
232                 t.initRequestOrdering(conn)
233                 if err := conn.setNumPieces(t.numPieces()); err != nil {
234                         log.Printf("closing connection: %s", err)
235                         conn.Close()
236                 }
237         }
238         return
239 }
240
241 func (t *torrent) setStorage(td data.Data) (err error) {
242         if c, ok := t.data.(io.Closer); ok {
243                 c.Close()
244         }
245         if sd, ok := td.(StatefulData); ok {
246                 t.data = sd
247         } else {
248                 t.data = &statelessDataWrapper{td, make([]bool, t.Info.NumPieces())}
249         }
250         return
251 }
252
253 func (t *torrent) haveAllMetadataPieces() bool {
254         if t.haveInfo() {
255                 return true
256         }
257         if t.metadataHave == nil {
258                 return false
259         }
260         for _, have := range t.metadataHave {
261                 if !have {
262                         return false
263                 }
264         }
265         return true
266 }
267
268 func (t *torrent) setMetadataSize(bytes int64, cl *Client) {
269         if t.haveInfo() {
270                 // We already know the correct metadata size.
271                 return
272         }
273         if bytes <= 0 || bytes > 10000000 { // 10MB, pulled from my ass.
274                 log.Printf("received bad metadata size: %d", bytes)
275                 return
276         }
277         if t.MetaData != nil && len(t.MetaData) == int(bytes) {
278                 return
279         }
280         t.MetaData = make([]byte, bytes)
281         t.metadataHave = make([]bool, (bytes+(1<<14)-1)/(1<<14))
282         for _, c := range t.Conns {
283                 cl.requestPendingMetadata(t, c)
284         }
285
286 }
287
288 // The current working name for the torrent. Either the name in the info dict,
289 // or a display name given such as by the dn value in a magnet link, or "".
290 func (t *torrent) Name() string {
291         if t.haveInfo() {
292                 return t.Info.Name
293         }
294         if t.DisplayName != "" {
295                 return t.DisplayName
296         }
297         return ""
298 }
299
300 func (t *torrent) pieceState(index int) (ret PieceState) {
301         p := t.Pieces[index]
302         ret.Priority = p.Priority
303         if t.pieceComplete(index) {
304                 ret.Complete = true
305         }
306         if p.QueuedForHash || p.Hashing {
307                 ret.Checking = true
308         }
309         if t.piecePartiallyDownloaded(index) {
310                 ret.Partial = true
311         }
312         return
313 }
314
315 func (t *torrent) metadataPieceSize(piece int) int {
316         return metadataPieceSize(len(t.MetaData), piece)
317 }
318
319 func (t *torrent) newMetadataExtensionMessage(c *connection, msgType int, piece int, data []byte) pp.Message {
320         d := map[string]int{
321                 "msg_type": msgType,
322                 "piece":    piece,
323         }
324         if data != nil {
325                 d["total_size"] = len(t.MetaData)
326         }
327         p, err := bencode.Marshal(d)
328         if err != nil {
329                 panic(err)
330         }
331         return pp.Message{
332                 Type:            pp.Extended,
333                 ExtendedID:      byte(c.PeerExtensionIDs["ut_metadata"]),
334                 ExtendedPayload: append(p, data...),
335         }
336 }
337
338 func (t *torrent) pieceStateRuns() (ret []PieceStateRun) {
339         rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
340                 ret = append(ret, PieceStateRun{
341                         PieceState: el.(PieceState),
342                         Length:     int(count),
343                 })
344         })
345         for index := range t.Pieces {
346                 rle.Append(t.pieceState(index), 1)
347         }
348         rle.Flush()
349         return
350 }
351
352 // Produces a small string representing a PieceStateRun.
353 func pieceStateRunStatusChars(psr PieceStateRun) (ret string) {
354         ret = fmt.Sprintf("%d", psr.Length)
355         ret += func() string {
356                 switch psr.Priority {
357                 case PiecePriorityNext:
358                         return "N"
359                 case PiecePriorityNormal:
360                         return "."
361                 case PiecePriorityReadahead:
362                         return "R"
363                 case PiecePriorityNow:
364                         return "!"
365                 default:
366                         return ""
367                 }
368         }()
369         if psr.Checking {
370                 ret += "H"
371         }
372         if psr.Partial {
373                 ret += "P"
374         }
375         if psr.Complete {
376                 ret += "C"
377         }
378         return
379 }
380
381 func (t *torrent) writeStatus(w io.Writer, cl *Client) {
382         fmt.Fprintf(w, "Infohash: %x\n", t.InfoHash)
383         fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
384         fmt.Fprintf(w, "Metadata have: ")
385         for _, h := range t.metadataHave {
386                 fmt.Fprintf(w, "%c", func() rune {
387                         if h {
388                                 return 'H'
389                         } else {
390                                 return '.'
391                         }
392                 }())
393         }
394         fmt.Fprintln(w)
395         fmt.Fprintf(w, "Piece length: %s\n", func() string {
396                 if t.haveInfo() {
397                         return fmt.Sprint(t.usualPieceSize())
398                 } else {
399                         return "?"
400                 }
401         }())
402         if t.haveInfo() {
403                 fmt.Fprint(w, "Pieces:")
404                 for _, psr := range t.pieceStateRuns() {
405                         w.Write([]byte(" "))
406                         w.Write([]byte(pieceStateRunStatusChars(psr)))
407                 }
408                 fmt.Fprintln(w)
409         }
410         fmt.Fprintf(w, "Urgent:")
411         for req := range t.urgent {
412                 fmt.Fprintf(w, " %v", req)
413         }
414         fmt.Fprintln(w)
415         fmt.Fprintf(w, "Trackers: ")
416         for _, tier := range t.Trackers {
417                 for _, tr := range tier {
418                         fmt.Fprintf(w, "%q ", tr.String())
419                 }
420         }
421         fmt.Fprintf(w, "\n")
422         fmt.Fprintf(w, "Pending peers: %d\n", len(t.Peers))
423         fmt.Fprintf(w, "Half open: %d\n", len(t.HalfOpen))
424         fmt.Fprintf(w, "Active peers: %d\n", len(t.Conns))
425         sort.Sort(&worstConns{
426                 c:  t.Conns,
427                 t:  t,
428                 cl: cl,
429         })
430         for _, c := range t.Conns {
431                 c.WriteStatus(w, t)
432         }
433 }
434
435 func (t *torrent) String() string {
436         s := t.Name()
437         if s == "" {
438                 s = fmt.Sprintf("%x", t.InfoHash)
439         }
440         return s
441 }
442
443 func (t *torrent) haveInfo() bool {
444         return t.Info != nil
445 }
446
447 // TODO: Include URIs that weren't converted to tracker clients.
448 func (t *torrent) announceList() (al [][]string) {
449         for _, tier := range t.Trackers {
450                 var l []string
451                 for _, tr := range tier {
452                         l = append(l, tr.URL())
453                 }
454                 al = append(al, l)
455         }
456         return
457 }
458
459 // Returns a run-time generated MetaInfo that includes the info bytes and
460 // announce-list as currently known to the client.
461 func (t *torrent) MetaInfo() *metainfo.MetaInfo {
462         if t.MetaData == nil {
463                 panic("info bytes not set")
464         }
465         return &metainfo.MetaInfo{
466                 Info: metainfo.InfoEx{
467                         Info:  *t.Info,
468                         Bytes: t.MetaData,
469                 },
470                 CreationDate: time.Now().Unix(),
471                 Comment:      "dynamic metainfo from client",
472                 CreatedBy:    "go.torrent",
473                 AnnounceList: t.announceList(),
474         }
475 }
476
477 func (t *torrent) bytesLeft() (left int64) {
478         if !t.haveInfo() {
479                 return -1
480         }
481         for i := 0; i < t.numPieces(); i++ {
482                 left += int64(t.pieceNumPendingBytes(i))
483         }
484         return
485 }
486
487 func (t *torrent) piecePartiallyDownloaded(index int) bool {
488         pendingBytes := t.pieceNumPendingBytes(index)
489         return pendingBytes != 0 && pendingBytes != t.pieceLength(index)
490 }
491
492 func numChunksForPiece(chunkSize int, pieceSize int) int {
493         return (pieceSize + chunkSize - 1) / chunkSize
494 }
495
496 func (t *torrent) usualPieceSize() int {
497         return int(t.Info.PieceLength)
498 }
499
500 func (t *torrent) lastPieceSize() int {
501         return int(t.pieceLength(t.numPieces() - 1))
502 }
503
504 func (t *torrent) numPieces() int {
505         return t.Info.NumPieces()
506 }
507
508 func (t *torrent) numPiecesCompleted() (num int) {
509         for i := range iter.N(t.Info.NumPieces()) {
510                 if t.pieceComplete(i) {
511                         num++
512                 }
513         }
514         return
515 }
516
517 func (t *torrent) Length() int64 {
518         return t.length
519 }
520
521 func (t *torrent) isClosed() bool {
522         select {
523         case <-t.closing:
524                 return true
525         default:
526                 return false
527         }
528 }
529
530 func (t *torrent) close() (err error) {
531         if t.isClosed() {
532                 return
533         }
534         t.ceaseNetworking()
535         close(t.closing)
536         if c, ok := t.data.(io.Closer); ok {
537                 c.Close()
538         }
539         for _, conn := range t.Conns {
540                 conn.Close()
541         }
542         return
543 }
544
545 func (t *torrent) requestOffset(r request) int64 {
546         return torrentRequestOffset(t.Length(), int64(t.usualPieceSize()), r)
547 }
548
549 // Return the request that would include the given offset into the torrent
550 // data. Returns !ok if there is no such request.
551 func (t *torrent) offsetRequest(off int64) (req request, ok bool) {
552         return torrentOffsetRequest(t.Length(), t.Info.PieceLength, chunkSize, off)
553 }
554
555 func (t *torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
556         n, err := t.data.WriteAt(data, int64(piece)*t.Info.PieceLength+begin)
557         if err == nil && n != len(data) {
558                 err = io.ErrShortWrite
559         }
560         return
561 }
562
563 func (t *torrent) bitfield() (bf []bool) {
564         for _, p := range t.Pieces {
565                 // TODO: Check this logic.
566                 bf = append(bf, p.EverHashed && p.numPendingChunks() == 0)
567         }
568         return
569 }
570
571 func (t *torrent) validOutgoingRequest(r request) bool {
572         if r.Index >= pp.Integer(t.Info.NumPieces()) {
573                 return false
574         }
575         if r.Begin%chunkSize != 0 {
576                 return false
577         }
578         if r.Length > chunkSize {
579                 return false
580         }
581         pieceLength := t.pieceLength(int(r.Index))
582         if r.Begin+r.Length > pieceLength {
583                 return false
584         }
585         return r.Length == chunkSize || r.Begin+r.Length == pieceLength
586 }
587
588 func (t *torrent) pieceChunks(piece int) (css []chunkSpec) {
589         css = make([]chunkSpec, 0, (t.pieceLength(piece)+chunkSize-1)/chunkSize)
590         var cs chunkSpec
591         for left := t.pieceLength(piece); left != 0; left -= cs.Length {
592                 cs.Length = left
593                 if cs.Length > chunkSize {
594                         cs.Length = chunkSize
595                 }
596                 css = append(css, cs)
597                 cs.Begin += cs.Length
598         }
599         return
600 }
601
602 func (t *torrent) pendAllChunkSpecs(pieceIndex int) {
603         piece := t.Pieces[pieceIndex]
604         if piece.PendingChunkSpecs == nil {
605                 // Allocate to exact size.
606                 piece.PendingChunkSpecs = make([]bool, (t.pieceLength(pieceIndex)+chunkSize-1)/chunkSize)
607         }
608         // Pend all the chunks.
609         pcss := piece.PendingChunkSpecs
610         for i := range pcss {
611                 pcss[i] = true
612         }
613         return
614 }
615
616 type Peer struct {
617         Id     [20]byte
618         IP     net.IP
619         Port   int
620         Source peerSource
621         // Peer is known to support encryption.
622         SupportsEncryption bool
623 }
624
625 func (t *torrent) pieceLength(piece int) (len_ pp.Integer) {
626         if int(piece) == t.numPieces()-1 {
627                 len_ = pp.Integer(t.Length() % t.Info.PieceLength)
628         }
629         if len_ == 0 {
630                 len_ = pp.Integer(t.Info.PieceLength)
631         }
632         return
633 }
634
635 func (t *torrent) hashPiece(piece pp.Integer) (ps pieceSum) {
636         hash := pieceHash.New()
637         t.data.WriteSectionTo(hash, int64(piece)*t.Info.PieceLength, t.Info.PieceLength)
638         util.CopyExact(ps[:], hash.Sum(nil))
639         return
640 }
641
642 func (t *torrent) haveAllPieces() bool {
643         if !t.haveInfo() {
644                 return false
645         }
646         for i := range t.Pieces {
647                 if !t.pieceComplete(i) {
648                         return false
649                 }
650         }
651         return true
652 }
653
654 func (me *torrent) haveAnyPieces() bool {
655         for i := range me.Pieces {
656                 if me.pieceComplete(i) {
657                         return true
658                 }
659         }
660         return false
661 }
662
663 func (t *torrent) havePiece(index int) bool {
664         return t.haveInfo() && t.pieceComplete(index)
665 }
666
667 func (t *torrent) haveChunk(r request) bool {
668         if !t.haveInfo() {
669                 return false
670         }
671         return !t.Pieces[r.Index].pendingChunk(r.chunkSpec)
672 }
673
674 func chunkIndex(cs chunkSpec) int {
675         return int(cs.Begin / chunkSize)
676 }
677
678 // TODO: This should probably be called wantPiece.
679 func (t *torrent) wantChunk(r request) bool {
680         if !t.wantPiece(int(r.Index)) {
681                 return false
682         }
683         if t.Pieces[r.Index].pendingChunk(r.chunkSpec) {
684                 return true
685         }
686         _, ok := t.urgent[r]
687         return ok
688 }
689
690 func (t *torrent) urgentChunkInPiece(piece int) bool {
691         p := pp.Integer(piece)
692         for req := range t.urgent {
693                 if req.Index == p {
694                         return true
695                 }
696         }
697         return false
698 }
699
700 // TODO: This should be called wantPieceIndex.
701 func (t *torrent) wantPiece(index int) bool {
702         if !t.haveInfo() {
703                 return false
704         }
705         p := t.Pieces[index]
706         if p.QueuedForHash {
707                 return false
708         }
709         if p.Hashing {
710                 return false
711         }
712         if p.Priority == PiecePriorityNone {
713                 if !t.urgentChunkInPiece(index) {
714                         return false
715                 }
716         }
717         // Put piece complete check last, since it's the slowest as it can involve
718         // calling out into external data stores.
719         return !t.pieceComplete(index)
720 }
721
722 func (t *torrent) connHasWantedPieces(c *connection) bool {
723         return c.pieceRequestOrder != nil && c.pieceRequestOrder.First() != nil
724 }
725
726 func (t *torrent) extentPieces(off, _len int64) (pieces []int) {
727         for i := off / int64(t.usualPieceSize()); i*int64(t.usualPieceSize()) < off+_len; i++ {
728                 pieces = append(pieces, int(i))
729         }
730         return
731 }
732
733 func (t *torrent) worstBadConn(cl *Client) *connection {
734         wcs := t.worstConns(cl)
735         heap.Init(wcs)
736         // A connection can only be bad if it's in the worst half, rounded down.
737         for wcs.Len() > (socketsPerTorrent+1)/2 {
738                 c := heap.Pop(wcs).(*connection)
739                 // Give connections 1 minute to prove themselves.
740                 if time.Since(c.completedHandshake) < time.Minute {
741                         continue
742                 }
743                 return c
744         }
745         return nil
746 }