]> Sergey Matveev's repositories - btrtrc.git/blob - torrent.go
Encapsulate torrent data, and provide os.File and mmap-based implementations
[btrtrc.git] / torrent.go
1 package torrent
2
3 import (
4         "container/heap"
5         "fmt"
6         "io"
7         "log"
8         "net"
9         "sort"
10         "sync"
11         "time"
12
13         "bitbucket.org/anacrolix/go.torrent/data/file"
14
15         pp "bitbucket.org/anacrolix/go.torrent/peer_protocol"
16         "bitbucket.org/anacrolix/go.torrent/tracker"
17         "bitbucket.org/anacrolix/go.torrent/util"
18         "github.com/anacrolix/libtorgo/bencode"
19         "github.com/anacrolix/libtorgo/metainfo"
20 )
21
22 func (t *torrent) PieceNumPendingBytes(index pp.Integer) (count pp.Integer) {
23         piece := t.Pieces[index]
24         if !piece.EverHashed {
25                 return t.PieceLength(index)
26         }
27         pendingChunks := t.Pieces[index].PendingChunkSpecs
28         count = pp.Integer(len(pendingChunks)) * chunkSize
29         _lastChunkSpec := lastChunkSpec(t.PieceLength(index))
30         if _lastChunkSpec.Length != chunkSize {
31                 if _, ok := pendingChunks[_lastChunkSpec]; ok {
32                         count += _lastChunkSpec.Length - chunkSize
33                 }
34         }
35         return
36 }
37
38 type peersKey struct {
39         IPBytes string
40         Port    int
41 }
42
43 type torrentData interface {
44         ReadAt(p []byte, off int64) (n int, err error)
45         Close()
46         WriteAt(p []byte, off int64) (n int, err error)
47         WriteSectionTo(w io.Writer, off, n int64) (written int64, err error)
48 }
49
50 // Is not aware of Client.
51 type torrent struct {
52         stateMu sync.Mutex
53         closing chan struct{}
54
55         // Closed when no more network activity is desired. This includes
56         // announcing, and communicating with peers.
57         ceasingNetworking chan struct{}
58
59         InfoHash InfoHash
60         Pieces   []*piece
61         length   int64
62
63         // Prevent mutations to Data memory maps while in use as they're not safe.
64         dataLock sync.RWMutex
65         Data     torrentData
66
67         Info *MetaInfo
68         // Active peer connections.
69         Conns []*connection
70         // Set of addrs to which we're attempting to connect.
71         HalfOpen map[string]struct{}
72
73         // Reserve of peers to connect to. A peer can be both here and in the
74         // active connections if were told about the peer after connecting with
75         // them. That encourages us to reconnect to peers that are well known.
76         Peers     map[peersKey]Peer
77         wantPeers sync.Cond
78
79         // BEP 12 Multitracker Metadata Extension. The tracker.Client instances
80         // mirror their respective URLs from the announce-list key.
81         Trackers     [][]tracker.Client
82         DisplayName  string
83         MetaData     []byte
84         metadataHave []bool
85
86         gotMetainfo chan struct{}
87         GotMetainfo <-chan struct{}
88 }
89
90 func (t *torrent) numConnsUnchoked() (num int) {
91         for _, c := range t.Conns {
92                 if !c.PeerChoked {
93                         num++
94                 }
95         }
96         return
97 }
98
99 func (t *torrent) addrActive(addr string) bool {
100         if _, ok := t.HalfOpen[addr]; ok {
101                 return true
102         }
103         for _, c := range t.Conns {
104                 if c.Socket.RemoteAddr().String() == addr {
105                         return true
106                 }
107         }
108         return false
109 }
110
111 func (t *torrent) worstConnsHeap() (wcs *worstConns) {
112         wcs = &worstConns{
113                 c: append([]*connection{}, t.Conns...),
114                 t: t,
115         }
116         heap.Init(wcs)
117         return
118 }
119
120 func (t *torrent) ceaseNetworking() {
121         t.stateMu.Lock()
122         defer t.stateMu.Unlock()
123         select {
124         case <-t.ceasingNetworking:
125                 return
126         default:
127         }
128         close(t.ceasingNetworking)
129         for _, c := range t.Conns {
130                 c.Close()
131         }
132 }
133
134 func (t *torrent) AddPeers(pp []Peer) {
135         for _, p := range pp {
136                 t.Peers[peersKey{string(p.IP), p.Port}] = p
137         }
138 }
139
140 func (t *torrent) InvalidateMetadata() {
141         t.MetaData = nil
142         t.metadataHave = nil
143         t.Info = nil
144 }
145
146 func (t *torrent) SaveMetadataPiece(index int, data []byte) {
147         if t.haveInfo() {
148                 return
149         }
150         if index >= len(t.metadataHave) {
151                 log.Printf("%s: ignoring metadata piece %d", t, index)
152                 return
153         }
154         copy(t.MetaData[(1<<14)*index:], data)
155         t.metadataHave[index] = true
156 }
157
158 func (t *torrent) MetadataPieceCount() int {
159         return (len(t.MetaData) + (1 << 14) - 1) / (1 << 14)
160 }
161
162 func (t *torrent) HaveMetadataPiece(piece int) bool {
163         if t.haveInfo() {
164                 return (1<<14)*piece < len(t.MetaData)
165         } else {
166                 return piece < len(t.metadataHave) && t.metadataHave[piece]
167         }
168 }
169
170 func (t *torrent) metadataSizeKnown() bool {
171         return t.MetaData != nil
172 }
173
174 func (t *torrent) metadataSize() int {
175         return len(t.MetaData)
176 }
177
178 func infoPieceHashes(info *metainfo.Info) (ret []string) {
179         for i := 0; i < len(info.Pieces); i += 20 {
180                 ret = append(ret, string(info.Pieces[i:i+20]))
181         }
182         return
183 }
184
185 // Called when metadata for a torrent becomes available.
186 func (t *torrent) setMetadata(md metainfo.Info, dataDir string, infoBytes []byte, eventLocker sync.Locker) (err error) {
187         t.Info = newMetaInfo(&md)
188         t.length = 0
189         for _, f := range t.Info.UpvertedFiles() {
190                 t.length += f.Length
191         }
192         t.MetaData = infoBytes
193         t.metadataHave = nil
194         for _, hash := range infoPieceHashes(&md) {
195                 piece := &piece{}
196                 piece.Event.L = eventLocker
197                 util.CopyExact(piece.Hash[:], hash)
198                 t.Pieces = append(t.Pieces, piece)
199         }
200         for _, conn := range t.Conns {
201                 t.initRequestOrdering(conn)
202                 if err := conn.setNumPieces(t.numPieces()); err != nil {
203                         log.Printf("closing connection: %s", err)
204                         conn.Close()
205                 }
206         }
207         t.Data, err = file.TorrentData(&md, dataDir)
208         if err != nil {
209                 err = fmt.Errorf("error mmap'ing torrent data: %s", err)
210                 return
211         }
212         return
213 }
214
215 func (t *torrent) HaveAllMetadataPieces() bool {
216         if t.haveInfo() {
217                 return true
218         }
219         if t.metadataHave == nil {
220                 return false
221         }
222         for _, have := range t.metadataHave {
223                 if !have {
224                         return false
225                 }
226         }
227         return true
228 }
229
230 func (t *torrent) SetMetadataSize(bytes int64) {
231         if t.MetaData != nil {
232                 return
233         }
234         if bytes > 10000000 { // 10MB, pulled from my ass.
235                 return
236         }
237         t.MetaData = make([]byte, bytes)
238         t.metadataHave = make([]bool, (bytes+(1<<14)-1)/(1<<14))
239 }
240
241 func (t *torrent) Name() string {
242         if t.haveInfo() {
243                 return t.Info.Name
244         }
245         if t.DisplayName != "" {
246                 return t.DisplayName
247         }
248         return t.InfoHash.HexString()
249 }
250
251 func (t *torrent) pieceStatusChar(index int) byte {
252         p := t.Pieces[index]
253         switch {
254         case p.Complete():
255                 return 'C'
256         case p.QueuedForHash:
257                 return 'Q'
258         case p.Hashing:
259                 return 'H'
260         case !p.EverHashed:
261                 return '?'
262         case t.PiecePartiallyDownloaded(index):
263                 switch p.Priority {
264                 case piecePriorityNone:
265                         return 'F' // Forgotten
266                 default:
267                         return 'P'
268                 }
269         default:
270                 switch p.Priority {
271                 case piecePriorityNone:
272                         return 'z'
273                 case piecePriorityNow:
274                         return '!'
275                 case piecePriorityReadahead:
276                         return 'R'
277                 case piecePriorityNext:
278                         return 'N'
279                 default:
280                         return '.'
281                 }
282         }
283 }
284
285 func (t *torrent) metadataPieceSize(piece int) int {
286         return metadataPieceSize(len(t.MetaData), piece)
287 }
288
289 func (t *torrent) NewMetadataExtensionMessage(c *connection, msgType int, piece int, data []byte) pp.Message {
290         d := map[string]int{
291                 "msg_type": msgType,
292                 "piece":    piece,
293         }
294         if data != nil {
295                 d["total_size"] = len(t.MetaData)
296         }
297         p, err := bencode.Marshal(d)
298         if err != nil {
299                 panic(err)
300         }
301         return pp.Message{
302                 Type:            pp.Extended,
303                 ExtendedID:      byte(c.PeerExtensionIDs["ut_metadata"]),
304                 ExtendedPayload: append(p, data...),
305         }
306 }
307
308 type PieceStatusCharSequence struct {
309         Char  byte
310         Count int
311 }
312
313 func (t *torrent) PieceStatusCharSequences() []PieceStatusCharSequence {
314         t.stateMu.Lock()
315         defer t.stateMu.Unlock()
316         return t.pieceStatusCharSequences()
317 }
318
319 // Returns the length of sequences of identical piece status chars.
320 func (t *torrent) pieceStatusCharSequences() (ret []PieceStatusCharSequence) {
321         var (
322                 char  byte
323                 count int
324         )
325         writeSequence := func() {
326                 ret = append(ret, PieceStatusCharSequence{char, count})
327         }
328         if len(t.Pieces) != 0 {
329                 char = t.pieceStatusChar(0)
330         }
331         for index := range t.Pieces {
332                 char1 := t.pieceStatusChar(index)
333                 if char1 == char {
334                         count++
335                 } else {
336                         writeSequence()
337                         char = char1
338                         count = 1
339                 }
340         }
341         if count != 0 {
342                 writeSequence()
343         }
344         return
345 }
346
347 func (t *torrent) WriteStatus(w io.Writer) {
348         fmt.Fprintf(w, "Infohash: %x\n", t.InfoHash)
349         fmt.Fprintf(w, "Piece length: %s\n", func() string {
350                 if t.haveInfo() {
351                         return fmt.Sprint(t.UsualPieceSize())
352                 } else {
353                         return "?"
354                 }
355         }())
356         if t.haveInfo() {
357                 fmt.Fprint(w, "Pieces: ")
358                 for _, seq := range t.pieceStatusCharSequences() {
359                         fmt.Fprintf(w, "%d%c ", seq.Count, seq.Char)
360                 }
361                 fmt.Fprintln(w)
362         }
363         fmt.Fprintf(w, "Trackers: ")
364         for _, tier := range t.Trackers {
365                 for _, tr := range tier {
366                         fmt.Fprintf(w, "%q ", tr.String())
367                 }
368         }
369         fmt.Fprintf(w, "\n")
370         fmt.Fprintf(w, "Pending peers: %d\n", len(t.Peers))
371         fmt.Fprintf(w, "Half open: %d\n", len(t.HalfOpen))
372         fmt.Fprintf(w, "Active peers: %d\n", len(t.Conns))
373         sort.Sort(&worstConns{
374                 c: t.Conns,
375                 t: t,
376         })
377         for _, c := range t.Conns {
378                 c.WriteStatus(w)
379         }
380 }
381
382 func (t *torrent) String() string {
383         s := t.Name()
384         if s == "" {
385                 s = fmt.Sprintf("%x", t.InfoHash)
386         }
387         return s
388 }
389
390 func (t *torrent) haveInfo() bool {
391         return t.Info != nil
392 }
393
394 // TODO: Include URIs that weren't converted to tracker clients.
395 func (t *torrent) AnnounceList() (al [][]string) {
396         for _, tier := range t.Trackers {
397                 var l []string
398                 for _, tr := range tier {
399                         l = append(l, tr.URL())
400                 }
401                 al = append(al, l)
402         }
403         return
404 }
405
406 func (t *torrent) MetaInfo() *metainfo.MetaInfo {
407         if t.MetaData == nil {
408                 panic("info bytes not set")
409         }
410         return &metainfo.MetaInfo{
411                 Info: metainfo.InfoEx{
412                         Info:  *t.Info.Info,
413                         Bytes: t.MetaData,
414                 },
415                 CreationDate: time.Now().Unix(),
416                 Comment:      "dynamic metainfo from client",
417                 CreatedBy:    "go.torrent",
418                 AnnounceList: t.AnnounceList(),
419         }
420 }
421
422 func (t *torrent) BytesLeft() (left int64) {
423         if !t.haveInfo() {
424                 return -1
425         }
426         for i := pp.Integer(0); i < pp.Integer(t.numPieces()); i++ {
427                 left += int64(t.PieceNumPendingBytes(i))
428         }
429         return
430 }
431
432 func (t *torrent) PiecePartiallyDownloaded(index int) bool {
433         return t.PieceNumPendingBytes(pp.Integer(index)) != t.PieceLength(pp.Integer(index))
434 }
435
436 func NumChunksForPiece(chunkSize int, pieceSize int) int {
437         return (pieceSize + chunkSize - 1) / chunkSize
438 }
439
440 func (t *torrent) UsualPieceSize() int {
441         return int(t.Info.PieceLength)
442 }
443
444 func (t *torrent) LastPieceSize() int {
445         return int(t.PieceLength(pp.Integer(t.numPieces() - 1)))
446 }
447
448 func (t *torrent) numPieces() int {
449         return len(t.Info.Pieces) / 20
450 }
451
452 func (t *torrent) NumPiecesCompleted() (num int) {
453         for _, p := range t.Pieces {
454                 if p.Complete() {
455                         num++
456                 }
457         }
458         return
459 }
460
461 func (t *torrent) Length() int64 {
462         return t.length
463 }
464
465 func (t *torrent) isClosed() bool {
466         select {
467         case <-t.closing:
468                 return true
469         default:
470                 return false
471         }
472 }
473
474 func (t *torrent) close() (err error) {
475         if t.isClosed() {
476                 return
477         }
478         t.ceaseNetworking()
479         close(t.closing)
480         t.dataLock.Lock()
481         if t.Data != nil {
482                 t.Data.Close()
483                 t.Data = nil
484         }
485         t.dataLock.Unlock()
486         for _, conn := range t.Conns {
487                 conn.Close()
488         }
489         return
490 }
491
492 // Return the request that would include the given offset into the torrent data.
493 func torrentOffsetRequest(torrentLength, pieceSize, chunkSize, offset int64) (
494         r request, ok bool) {
495         if offset < 0 || offset >= torrentLength {
496                 return
497         }
498         r.Index = pp.Integer(offset / pieceSize)
499         r.Begin = pp.Integer(offset % pieceSize / chunkSize * chunkSize)
500         left := torrentLength - int64(r.Index)*pieceSize - int64(r.Begin)
501         if chunkSize < left {
502                 r.Length = pp.Integer(chunkSize)
503         } else {
504                 r.Length = pp.Integer(left)
505         }
506         ok = true
507         return
508 }
509
510 func torrentRequestOffset(torrentLength, pieceSize int64, r request) (off int64) {
511         off = int64(r.Index)*pieceSize + int64(r.Begin)
512         if off < 0 || off >= torrentLength {
513                 panic("invalid request")
514         }
515         return
516 }
517
518 func (t *torrent) requestOffset(r request) int64 {
519         return torrentRequestOffset(t.Length(), int64(t.UsualPieceSize()), r)
520 }
521
522 // Return the request that would include the given offset into the torrent data.
523 func (t *torrent) offsetRequest(off int64) (req request, ok bool) {
524         return torrentOffsetRequest(t.Length(), t.Info.PieceLength, chunkSize, off)
525 }
526
527 func (t *torrent) WriteChunk(piece int, begin int64, data []byte) (err error) {
528         _, err = t.Data.WriteAt(data, int64(piece)*t.Info.PieceLength+begin)
529         return
530 }
531
532 func (t *torrent) bitfield() (bf []bool) {
533         for _, p := range t.Pieces {
534                 bf = append(bf, p.EverHashed && len(p.PendingChunkSpecs) == 0)
535         }
536         return
537 }
538
539 func (t *torrent) pieceChunks(piece int) (css []chunkSpec) {
540         css = make([]chunkSpec, 0, (t.PieceLength(pp.Integer(piece))+chunkSize-1)/chunkSize)
541         var cs chunkSpec
542         for left := t.PieceLength(pp.Integer(piece)); left != 0; left -= cs.Length {
543                 cs.Length = left
544                 if cs.Length > chunkSize {
545                         cs.Length = chunkSize
546                 }
547                 css = append(css, cs)
548                 cs.Begin += cs.Length
549         }
550         return
551 }
552
553 func (t *torrent) pendAllChunkSpecs(index pp.Integer) {
554         piece := t.Pieces[index]
555         if piece.PendingChunkSpecs == nil {
556                 piece.PendingChunkSpecs = make(
557                         map[chunkSpec]struct{},
558                         (t.PieceLength(index)+chunkSize-1)/chunkSize)
559         }
560         pcss := piece.PendingChunkSpecs
561         for _, cs := range t.pieceChunks(int(index)) {
562                 pcss[cs] = struct{}{}
563         }
564         return
565 }
566
567 type Peer struct {
568         Id     [20]byte
569         IP     net.IP
570         Port   int
571         Source peerSource
572 }
573
574 func (t *torrent) PieceLength(piece pp.Integer) (len_ pp.Integer) {
575         if int(piece) == t.numPieces()-1 {
576                 len_ = pp.Integer(t.Length() % t.Info.PieceLength)
577         }
578         if len_ == 0 {
579                 len_ = pp.Integer(t.Info.PieceLength)
580         }
581         return
582 }
583
584 func (t *torrent) HashPiece(piece pp.Integer) (ps pieceSum) {
585         hash := pieceHash.New()
586         t.dataLock.RLock()
587         t.Data.WriteSectionTo(hash, int64(piece)*t.Info.PieceLength, t.Info.PieceLength)
588         t.dataLock.RUnlock()
589         util.CopyExact(ps[:], hash.Sum(nil))
590         return
591 }
592 func (t *torrent) haveAllPieces() bool {
593         if !t.haveInfo() {
594                 return false
595         }
596         for _, piece := range t.Pieces {
597                 if !piece.Complete() {
598                         return false
599                 }
600         }
601         return true
602 }
603
604 func (me *torrent) haveAnyPieces() bool {
605         for _, piece := range me.Pieces {
606                 if piece.Complete() {
607                         return true
608                 }
609         }
610         return false
611 }
612
613 func (t *torrent) havePiece(index int) bool {
614         return t.haveInfo() && t.Pieces[index].Complete()
615 }
616
617 func (t *torrent) haveChunk(r request) bool {
618         p := t.Pieces[r.Index]
619         if !p.EverHashed {
620                 return false
621         }
622         _, ok := p.PendingChunkSpecs[r.chunkSpec]
623         return !ok
624 }
625
626 func (t *torrent) wantChunk(r request) bool {
627         if !t.wantPiece(int(r.Index)) {
628                 return false
629         }
630         _, ok := t.Pieces[r.Index].PendingChunkSpecs[r.chunkSpec]
631         return ok
632 }
633
634 func (t *torrent) wantPiece(index int) bool {
635         if !t.haveInfo() {
636                 return false
637         }
638         p := t.Pieces[index]
639         return p.EverHashed && len(p.PendingChunkSpecs) != 0 && p.Priority != piecePriorityNone
640 }
641
642 func (t *torrent) connHasWantedPieces(c *connection) bool {
643         for p := range t.Pieces {
644                 if t.wantPiece(p) && c.PeerHasPiece(pp.Integer(p)) {
645                         return true
646                 }
647         }
648         return false
649 }
650
651 func (t *torrent) extentPieces(off, _len int64) (pieces []int) {
652         for i := off / int64(t.UsualPieceSize()); i*int64(t.UsualPieceSize()) < off+_len; i++ {
653                 pieces = append(pieces, int(i))
654         }
655         return
656 }