]> Sergey Matveev's repositories - btrtrc.git/blob - torrent.go
ec3624546d78a0e75f70892c5121aa98d9a34639
[btrtrc.git] / torrent.go
1 package torrent
2
3 import (
4         "container/heap"
5         "errors"
6         "fmt"
7         "io"
8         "log"
9         "math"
10         "math/rand"
11         "net"
12         "os"
13         "sort"
14         "sync"
15         "time"
16
17         "github.com/anacrolix/missinggo"
18         "github.com/anacrolix/missinggo/bitmap"
19         "github.com/anacrolix/missinggo/itertools"
20         "github.com/anacrolix/missinggo/perf"
21         "github.com/anacrolix/missinggo/pubsub"
22         "github.com/bradfitz/iter"
23
24         "github.com/anacrolix/torrent/bencode"
25         "github.com/anacrolix/torrent/metainfo"
26         pp "github.com/anacrolix/torrent/peer_protocol"
27         "github.com/anacrolix/torrent/storage"
28         "github.com/anacrolix/torrent/tracker"
29 )
30
31 func (t *Torrent) chunkIndexSpec(chunkIndex, piece int) chunkSpec {
32         return chunkIndexSpec(chunkIndex, t.pieceLength(piece), t.chunkSize)
33 }
34
35 type peersKey struct {
36         IPBytes string
37         Port    int
38 }
39
40 // Maintains state of torrent within a Client.
41 type Torrent struct {
42         cl *Client
43
44         closed   missinggo.Event
45         infoHash metainfo.Hash
46         pieces   []piece
47         // Values are the piece indices that changed.
48         pieceStateChanges *pubsub.PubSub
49         chunkSize         pp.Integer
50         // Total length of the torrent in bytes. Stored because it's not O(1) to
51         // get this from the info dict.
52         length int64
53
54         // The storage to open when the info dict becomes available.
55         storageOpener storage.Client
56         // Storage for torrent data.
57         storage storage.Torrent
58
59         // The info dict. nil if we don't have it (yet).
60         info *metainfo.InfoEx
61         // Active peer connections, running message stream loops.
62         conns []*connection
63         // Set of addrs to which we're attempting to connect. Connections are
64         // half-open until all handshakes are completed.
65         halfOpen map[string]struct{}
66
67         // Reserve of peers to connect to. A peer can be both here and in the
68         // active connections if were told about the peer after connecting with
69         // them. That encourages us to reconnect to peers that are well known.
70         peers     map[peersKey]Peer
71         wantPeers sync.Cond
72
73         // BEP 12 Multitracker Metadata Extension. The tracker.Client instances
74         // mirror their respective URLs from the announce-list metainfo key.
75         trackers []trackerTier
76         // Name used if the info name isn't available.
77         displayName string
78         // The bencoded bytes of the info dict.
79         metadataBytes []byte
80         // Each element corresponds to the 16KiB metadata pieces. If true, we have
81         // received that piece.
82         metadataCompletedChunks []bool
83
84         // Set when .Info is obtained.
85         gotMetainfo missinggo.Event
86
87         readers map[*Reader]struct{}
88
89         pendingPieces   bitmap.Bitmap
90         completedPieces bitmap.Bitmap
91
92         connPieceInclinationPool sync.Pool
93 }
94
95 func (t *Torrent) setDisplayName(dn string) {
96         t.displayName = dn
97 }
98
99 func (t *Torrent) pieceComplete(piece int) bool {
100         return t.completedPieces.Get(piece)
101 }
102
103 func (t *Torrent) pieceCompleteUncached(piece int) bool {
104         return t.pieces[piece].Storage().GetIsComplete()
105 }
106
107 func (t *Torrent) numConnsUnchoked() (num int) {
108         for _, c := range t.conns {
109                 if !c.PeerChoked {
110                         num++
111                 }
112         }
113         return
114 }
115
116 // There's a connection to that address already.
117 func (t *Torrent) addrActive(addr string) bool {
118         if _, ok := t.halfOpen[addr]; ok {
119                 return true
120         }
121         for _, c := range t.conns {
122                 if c.remoteAddr().String() == addr {
123                         return true
124                 }
125         }
126         return false
127 }
128
129 func (t *Torrent) worstConns(cl *Client) (wcs *worstConns) {
130         wcs = &worstConns{
131                 c:  make([]*connection, 0, len(t.conns)),
132                 t:  t,
133                 cl: cl,
134         }
135         for _, c := range t.conns {
136                 if !c.closed.IsSet() {
137                         wcs.c = append(wcs.c, c)
138                 }
139         }
140         return
141 }
142
143 func (t *Torrent) addPeer(p Peer, cl *Client) {
144         cl.openNewConns(t)
145         if len(t.peers) >= torrentPeersHighWater {
146                 return
147         }
148         key := peersKey{string(p.IP), p.Port}
149         if _, ok := t.peers[key]; ok {
150                 return
151         }
152         t.peers[key] = p
153         peersAddedBySource.Add(string(p.Source), 1)
154         cl.openNewConns(t)
155
156 }
157
158 func (t *Torrent) invalidateMetadata() {
159         t.metadataBytes = nil
160         t.metadataCompletedChunks = nil
161         t.info = nil
162 }
163
164 func (t *Torrent) saveMetadataPiece(index int, data []byte) {
165         if t.haveInfo() {
166                 return
167         }
168         if index >= len(t.metadataCompletedChunks) {
169                 log.Printf("%s: ignoring metadata piece %d", t, index)
170                 return
171         }
172         copy(t.metadataBytes[(1<<14)*index:], data)
173         t.metadataCompletedChunks[index] = true
174 }
175
176 func (t *Torrent) metadataPieceCount() int {
177         return (len(t.metadataBytes) + (1 << 14) - 1) / (1 << 14)
178 }
179
180 func (t *Torrent) haveMetadataPiece(piece int) bool {
181         if t.haveInfo() {
182                 return (1<<14)*piece < len(t.metadataBytes)
183         } else {
184                 return piece < len(t.metadataCompletedChunks) && t.metadataCompletedChunks[piece]
185         }
186 }
187
188 func (t *Torrent) metadataSizeKnown() bool {
189         return t.metadataBytes != nil
190 }
191
192 func (t *Torrent) metadataSize() int {
193         return len(t.metadataBytes)
194 }
195
196 func infoPieceHashes(info *metainfo.Info) (ret []string) {
197         for i := 0; i < len(info.Pieces); i += 20 {
198                 ret = append(ret, string(info.Pieces[i:i+20]))
199         }
200         return
201 }
202
203 // Called when metadata for a torrent becomes available.
204 func (t *Torrent) setInfoBytes(b []byte) error {
205         if t.haveInfo() {
206                 return nil
207         }
208         var ie *metainfo.InfoEx
209         err := bencode.Unmarshal(b, &ie)
210         if err != nil {
211                 return fmt.Errorf("error unmarshalling info bytes: %s", err)
212         }
213         if ie.Hash() != t.infoHash {
214                 return errors.New("info bytes have wrong hash")
215         }
216         err = validateInfo(&ie.Info)
217         if err != nil {
218                 return fmt.Errorf("bad info: %s", err)
219         }
220         t.info = ie
221         t.cl.event.Broadcast()
222         t.gotMetainfo.Set()
223         t.storage, err = t.storageOpener.OpenTorrent(t.info)
224         if err != nil {
225                 return fmt.Errorf("error opening torrent storage: %s", err)
226         }
227         t.length = 0
228         for _, f := range t.info.UpvertedFiles() {
229                 t.length += f.Length
230         }
231         t.metadataBytes = b
232         t.metadataCompletedChunks = nil
233         hashes := infoPieceHashes(&t.info.Info)
234         t.pieces = make([]piece, len(hashes))
235         for i, hash := range hashes {
236                 piece := &t.pieces[i]
237                 piece.t = t
238                 piece.index = i
239                 piece.noPendingWrites.L = &piece.pendingWritesMutex
240                 missinggo.CopyExact(piece.Hash[:], hash)
241         }
242         for _, conn := range t.conns {
243                 if err := conn.setNumPieces(t.numPieces()); err != nil {
244                         log.Printf("closing connection: %s", err)
245                         conn.Close()
246                 }
247         }
248         for i := range t.pieces {
249                 t.updatePieceCompletion(i)
250                 t.pieces[i].QueuedForHash = true
251         }
252         go func() {
253                 for i := range t.pieces {
254                         t.verifyPiece(i)
255                 }
256         }()
257         return nil
258 }
259
260 func (t *Torrent) verifyPiece(piece int) {
261         t.cl.verifyPiece(t, piece)
262 }
263
264 func (t *Torrent) haveAllMetadataPieces() bool {
265         if t.haveInfo() {
266                 return true
267         }
268         if t.metadataCompletedChunks == nil {
269                 return false
270         }
271         for _, have := range t.metadataCompletedChunks {
272                 if !have {
273                         return false
274                 }
275         }
276         return true
277 }
278
279 // TODO: Propagate errors to disconnect peer.
280 func (t *Torrent) setMetadataSize(bytes int64) (err error) {
281         if t.haveInfo() {
282                 // We already know the correct metadata size.
283                 return
284         }
285         if bytes <= 0 || bytes > 10000000 { // 10MB, pulled from my ass.
286                 return errors.New("bad size")
287         }
288         if t.metadataBytes != nil && len(t.metadataBytes) == int(bytes) {
289                 return
290         }
291         t.metadataBytes = make([]byte, bytes)
292         t.metadataCompletedChunks = make([]bool, (bytes+(1<<14)-1)/(1<<14))
293         for _, c := range t.conns {
294                 c.requestPendingMetadata()
295         }
296         return
297 }
298
299 // The current working name for the torrent. Either the name in the info dict,
300 // or a display name given such as by the dn value in a magnet link, or "".
301 func (t *Torrent) name() string {
302         if t.haveInfo() {
303                 return t.info.Name
304         }
305         return t.displayName
306 }
307
308 func (t *Torrent) pieceState(index int) (ret PieceState) {
309         p := &t.pieces[index]
310         ret.Priority = t.piecePriority(index)
311         if t.pieceComplete(index) {
312                 ret.Complete = true
313         }
314         if p.QueuedForHash || p.Hashing {
315                 ret.Checking = true
316         }
317         if !ret.Complete && t.piecePartiallyDownloaded(index) {
318                 ret.Partial = true
319         }
320         return
321 }
322
323 func (t *Torrent) metadataPieceSize(piece int) int {
324         return metadataPieceSize(len(t.metadataBytes), piece)
325 }
326
327 func (t *Torrent) newMetadataExtensionMessage(c *connection, msgType int, piece int, data []byte) pp.Message {
328         d := map[string]int{
329                 "msg_type": msgType,
330                 "piece":    piece,
331         }
332         if data != nil {
333                 d["total_size"] = len(t.metadataBytes)
334         }
335         p, err := bencode.Marshal(d)
336         if err != nil {
337                 panic(err)
338         }
339         return pp.Message{
340                 Type:            pp.Extended,
341                 ExtendedID:      c.PeerExtensionIDs["ut_metadata"],
342                 ExtendedPayload: append(p, data...),
343         }
344 }
345
346 func (t *Torrent) pieceStateRuns() (ret []PieceStateRun) {
347         rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
348                 ret = append(ret, PieceStateRun{
349                         PieceState: el.(PieceState),
350                         Length:     int(count),
351                 })
352         })
353         for index := range t.pieces {
354                 rle.Append(t.pieceState(index), 1)
355         }
356         rle.Flush()
357         return
358 }
359
360 // Produces a small string representing a PieceStateRun.
361 func pieceStateRunStatusChars(psr PieceStateRun) (ret string) {
362         ret = fmt.Sprintf("%d", psr.Length)
363         ret += func() string {
364                 switch psr.Priority {
365                 case PiecePriorityNext:
366                         return "N"
367                 case PiecePriorityNormal:
368                         return "."
369                 case PiecePriorityReadahead:
370                         return "R"
371                 case PiecePriorityNow:
372                         return "!"
373                 default:
374                         return ""
375                 }
376         }()
377         if psr.Checking {
378                 ret += "H"
379         }
380         if psr.Partial {
381                 ret += "P"
382         }
383         if psr.Complete {
384                 ret += "C"
385         }
386         return
387 }
388
389 func (t *Torrent) writeStatus(w io.Writer, cl *Client) {
390         fmt.Fprintf(w, "Infohash: %x\n", t.infoHash)
391         fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
392         if !t.haveInfo() {
393                 fmt.Fprintf(w, "Metadata have: ")
394                 for _, h := range t.metadataCompletedChunks {
395                         fmt.Fprintf(w, "%c", func() rune {
396                                 if h {
397                                         return 'H'
398                                 } else {
399                                         return '.'
400                                 }
401                         }())
402                 }
403                 fmt.Fprintln(w)
404         }
405         fmt.Fprintf(w, "Piece length: %s\n", func() string {
406                 if t.haveInfo() {
407                         return fmt.Sprint(t.usualPieceSize())
408                 } else {
409                         return "?"
410                 }
411         }())
412         if t.haveInfo() {
413                 fmt.Fprintf(w, "Num Pieces: %d\n", t.numPieces())
414                 fmt.Fprint(w, "Piece States:")
415                 for _, psr := range t.pieceStateRuns() {
416                         w.Write([]byte(" "))
417                         w.Write([]byte(pieceStateRunStatusChars(psr)))
418                 }
419                 fmt.Fprintln(w)
420         }
421         fmt.Fprintf(w, "Reader Pieces:")
422         t.forReaderOffsetPieces(func(begin, end int) (again bool) {
423                 fmt.Fprintf(w, " %d:%d", begin, end)
424                 return true
425         })
426         fmt.Fprintln(w)
427         fmt.Fprintf(w, "Trackers: ")
428         for _, tier := range t.trackers {
429                 for _, tr := range tier {
430                         fmt.Fprintf(w, "%q ", tr)
431                 }
432         }
433         fmt.Fprintf(w, "\n")
434         fmt.Fprintf(w, "Pending peers: %d\n", len(t.peers))
435         fmt.Fprintf(w, "Half open: %d\n", len(t.halfOpen))
436         fmt.Fprintf(w, "Active peers: %d\n", len(t.conns))
437         sort.Sort(&worstConns{
438                 c:  t.conns,
439                 t:  t,
440                 cl: cl,
441         })
442         for i, c := range t.conns {
443                 fmt.Fprintf(w, "%2d. ", i+1)
444                 c.WriteStatus(w, t)
445         }
446 }
447
448 func (t *Torrent) haveInfo() bool {
449         return t.info != nil
450 }
451
452 // TODO: Include URIs that weren't converted to tracker clients.
453 func (t *Torrent) announceList() (al [][]string) {
454         missinggo.CastSlice(&al, t.trackers)
455         return
456 }
457
458 // Returns a run-time generated MetaInfo that includes the info bytes and
459 // announce-list as currently known to the client.
460 func (t *Torrent) metainfo() *metainfo.MetaInfo {
461         if t.metadataBytes == nil {
462                 panic("info bytes not set")
463         }
464         return &metainfo.MetaInfo{
465                 Info:         *t.info,
466                 CreationDate: time.Now().Unix(),
467                 Comment:      "dynamic metainfo from client",
468                 CreatedBy:    "go.torrent",
469                 AnnounceList: t.announceList(),
470         }
471 }
472
473 func (t *Torrent) bytesLeft() (left int64) {
474         for i := 0; i < t.numPieces(); i++ {
475                 left += int64(t.pieces[i].bytesLeft())
476         }
477         return
478 }
479
480 // Bytes left to give in tracker announces.
481 func (t *Torrent) bytesLeftAnnounce() uint64 {
482         if t.haveInfo() {
483                 return uint64(t.bytesLeft())
484         } else {
485                 return math.MaxUint64
486         }
487 }
488
489 func (t *Torrent) piecePartiallyDownloaded(piece int) bool {
490         if t.pieceComplete(piece) {
491                 return false
492         }
493         if t.pieceAllDirty(piece) {
494                 return false
495         }
496         return t.pieces[piece].hasDirtyChunks()
497 }
498
499 func (t *Torrent) usualPieceSize() int {
500         return int(t.info.PieceLength)
501 }
502
503 func (t *Torrent) lastPieceSize() int {
504         return int(t.pieceLength(t.numPieces() - 1))
505 }
506
507 func (t *Torrent) numPieces() int {
508         return t.info.NumPieces()
509 }
510
511 func (t *Torrent) numPiecesCompleted() (num int) {
512         return t.completedPieces.Len()
513 }
514
515 func (t *Torrent) close() (err error) {
516         t.closed.Set()
517         if c, ok := t.storage.(io.Closer); ok {
518                 c.Close()
519         }
520         for _, conn := range t.conns {
521                 conn.Close()
522         }
523         t.pieceStateChanges.Close()
524         return
525 }
526
527 func (t *Torrent) requestOffset(r request) int64 {
528         return torrentRequestOffset(t.length, int64(t.usualPieceSize()), r)
529 }
530
531 // Return the request that would include the given offset into the torrent
532 // data. Returns !ok if there is no such request.
533 func (t *Torrent) offsetRequest(off int64) (req request, ok bool) {
534         return torrentOffsetRequest(t.length, t.info.PieceLength, int64(t.chunkSize), off)
535 }
536
537 func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
538         tr := perf.NewTimer()
539
540         n, err := t.pieces[piece].Storage().WriteAt(data, begin)
541         if err == nil && n != len(data) {
542                 err = io.ErrShortWrite
543         }
544         if err == nil {
545                 tr.Stop("write chunk")
546         }
547         return
548 }
549
550 func (t *Torrent) bitfield() (bf []bool) {
551         bf = make([]bool, t.numPieces())
552         t.completedPieces.IterTyped(func(piece int) (again bool) {
553                 bf[piece] = true
554                 return true
555         })
556         return
557 }
558
559 func (t *Torrent) validOutgoingRequest(r request) bool {
560         if r.Index >= pp.Integer(t.info.NumPieces()) {
561                 return false
562         }
563         if r.Begin%t.chunkSize != 0 {
564                 return false
565         }
566         if r.Length > t.chunkSize {
567                 return false
568         }
569         pieceLength := t.pieceLength(int(r.Index))
570         if r.Begin+r.Length > pieceLength {
571                 return false
572         }
573         return r.Length == t.chunkSize || r.Begin+r.Length == pieceLength
574 }
575
576 func (t *Torrent) pieceChunks(piece int) (css []chunkSpec) {
577         css = make([]chunkSpec, 0, (t.pieceLength(piece)+t.chunkSize-1)/t.chunkSize)
578         var cs chunkSpec
579         for left := t.pieceLength(piece); left != 0; left -= cs.Length {
580                 cs.Length = left
581                 if cs.Length > t.chunkSize {
582                         cs.Length = t.chunkSize
583                 }
584                 css = append(css, cs)
585                 cs.Begin += cs.Length
586         }
587         return
588 }
589
590 func (t *Torrent) pieceNumChunks(piece int) int {
591         return int((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize)
592 }
593
594 func (t *Torrent) pendAllChunkSpecs(pieceIndex int) {
595         t.pieces[pieceIndex].DirtyChunks.Clear()
596 }
597
598 type Peer struct {
599         Id     [20]byte
600         IP     net.IP
601         Port   int
602         Source peerSource
603         // Peer is known to support encryption.
604         SupportsEncryption bool
605 }
606
607 func (t *Torrent) pieceLength(piece int) (len_ pp.Integer) {
608         if piece < 0 || piece >= t.info.NumPieces() {
609                 return
610         }
611         if piece == t.numPieces()-1 {
612                 len_ = pp.Integer(t.length % t.info.PieceLength)
613         }
614         if len_ == 0 {
615                 len_ = pp.Integer(t.info.PieceLength)
616         }
617         return
618 }
619
620 func (t *Torrent) hashPiece(piece int) (ret metainfo.Hash) {
621         hash := pieceHash.New()
622         p := &t.pieces[piece]
623         p.waitNoPendingWrites()
624         ip := t.info.Piece(piece)
625         pl := ip.Length()
626         n, err := io.Copy(hash, io.NewSectionReader(t.pieces[piece].Storage(), 0, pl))
627         if n == pl {
628                 missinggo.CopyExact(&ret, hash.Sum(nil))
629                 return
630         }
631         if err != io.ErrUnexpectedEOF && !os.IsNotExist(err) {
632                 log.Printf("unexpected error hashing piece with %T: %s", t.storage, err)
633         }
634         return
635 }
636
637 func (t *Torrent) haveAllPieces() bool {
638         if !t.haveInfo() {
639                 return false
640         }
641         return t.completedPieces.Len() == t.numPieces()
642 }
643
644 func (t *Torrent) haveAnyPieces() bool {
645         for i := range t.pieces {
646                 if t.pieceComplete(i) {
647                         return true
648                 }
649         }
650         return false
651 }
652
653 func (t *Torrent) havePiece(index int) bool {
654         return t.haveInfo() && t.pieceComplete(index)
655 }
656
657 func (t *Torrent) haveChunk(r request) (ret bool) {
658         // defer func() {
659         //      log.Println("have chunk", r, ret)
660         // }()
661         if !t.haveInfo() {
662                 return false
663         }
664         if t.pieceComplete(int(r.Index)) {
665                 return true
666         }
667         p := &t.pieces[r.Index]
668         return !p.pendingChunk(r.chunkSpec, t.chunkSize)
669 }
670
671 func chunkIndex(cs chunkSpec, chunkSize pp.Integer) int {
672         return int(cs.Begin / chunkSize)
673 }
674
675 func (t *Torrent) wantPiece(r request) bool {
676         if !t.wantPieceIndex(int(r.Index)) {
677                 return false
678         }
679         if t.pieces[r.Index].pendingChunk(r.chunkSpec, t.chunkSize) {
680                 return true
681         }
682         // TODO: What about pieces that were wanted, but aren't now, and aren't
683         // completed either? That used to be done here.
684         return false
685 }
686
687 func (t *Torrent) wantPieceIndex(index int) bool {
688         if !t.haveInfo() {
689                 return false
690         }
691         p := &t.pieces[index]
692         if p.QueuedForHash {
693                 return false
694         }
695         if p.Hashing {
696                 return false
697         }
698         if t.pieceComplete(index) {
699                 return false
700         }
701         if t.pendingPieces.Contains(index) {
702                 return true
703         }
704         return !t.forReaderOffsetPieces(func(begin, end int) bool {
705                 return index < begin || index >= end
706         })
707 }
708
709 func (t *Torrent) forNeededPieces(f func(piece int) (more bool)) (all bool) {
710         return t.forReaderOffsetPieces(func(begin, end int) (more bool) {
711                 for i := begin; begin < end; i++ {
712                         if !f(i) {
713                                 return false
714                         }
715                 }
716                 return true
717         })
718 }
719
720 func (t *Torrent) connHasWantedPieces(c *connection) bool {
721         return !c.pieceRequestOrder.IsEmpty()
722 }
723
724 func (t *Torrent) extentPieces(off, _len int64) (pieces []int) {
725         for i := off / int64(t.usualPieceSize()); i*int64(t.usualPieceSize()) < off+_len; i++ {
726                 pieces = append(pieces, int(i))
727         }
728         return
729 }
730
731 func (t *Torrent) worstBadConn(cl *Client) *connection {
732         wcs := t.worstConns(cl)
733         heap.Init(wcs)
734         for wcs.Len() != 0 {
735                 c := heap.Pop(wcs).(*connection)
736                 if c.UnwantedChunksReceived >= 6 && c.UnwantedChunksReceived > c.UsefulChunksReceived {
737                         return c
738                 }
739                 if wcs.Len() >= (socketsPerTorrent+1)/2 {
740                         // Give connections 1 minute to prove themselves.
741                         if time.Since(c.completedHandshake) > time.Minute {
742                                 return c
743                         }
744                 }
745         }
746         return nil
747 }
748
749 type PieceStateChange struct {
750         Index int
751         PieceState
752 }
753
754 func (t *Torrent) publishPieceChange(piece int) {
755         cur := t.pieceState(piece)
756         p := &t.pieces[piece]
757         if cur != p.PublicPieceState {
758                 p.PublicPieceState = cur
759                 t.pieceStateChanges.Publish(PieceStateChange{
760                         piece,
761                         cur,
762                 })
763         }
764 }
765
766 func (t *Torrent) pieceNumPendingChunks(piece int) int {
767         if t.pieceComplete(piece) {
768                 return 0
769         }
770         return t.pieceNumChunks(piece) - t.pieces[piece].numDirtyChunks()
771 }
772
773 func (t *Torrent) pieceAllDirty(piece int) bool {
774         return t.pieces[piece].DirtyChunks.Len() == t.pieceNumChunks(piece)
775 }
776
777 func (t *Torrent) forUrgentPieces(f func(piece int) (again bool)) (all bool) {
778         return t.forReaderOffsetPieces(func(begin, end int) (again bool) {
779                 if begin < end {
780                         if !f(begin) {
781                                 return false
782                         }
783                 }
784                 return true
785         })
786 }
787
788 func (t *Torrent) readersChanged() {
789         t.updatePiecePriorities()
790 }
791
792 func (t *Torrent) maybeNewConns() {
793         // Tickle the accept routine.
794         t.cl.event.Broadcast()
795         t.openNewConns()
796 }
797
798 func (t *Torrent) piecePriorityChanged(piece int) {
799         for _, c := range t.conns {
800                 c.updatePiecePriority(piece)
801         }
802         t.maybeNewConns()
803         t.publishPieceChange(piece)
804 }
805
806 func (t *Torrent) updatePiecePriority(piece int) bool {
807         p := &t.pieces[piece]
808         newPrio := t.piecePriorityUncached(piece)
809         if newPrio == p.priority {
810                 return false
811         }
812         p.priority = newPrio
813         return true
814 }
815
816 // Update all piece priorities in one hit. This function should have the same
817 // output as updatePiecePriority, but across all pieces.
818 func (t *Torrent) updatePiecePriorities() {
819         newPrios := make([]piecePriority, t.numPieces())
820         t.pendingPieces.IterTyped(func(piece int) (more bool) {
821                 newPrios[piece] = PiecePriorityNormal
822                 return true
823         })
824         t.forReaderOffsetPieces(func(begin, end int) (next bool) {
825                 if begin < end {
826                         newPrios[begin].Raise(PiecePriorityNow)
827                 }
828                 for i := begin + 1; i < end; i++ {
829                         newPrios[i].Raise(PiecePriorityReadahead)
830                 }
831                 return true
832         })
833         t.completedPieces.IterTyped(func(piece int) (more bool) {
834                 newPrios[piece] = PiecePriorityNone
835                 return true
836         })
837         for i, prio := range newPrios {
838                 if prio != t.pieces[i].priority {
839                         t.pieces[i].priority = prio
840                         t.piecePriorityChanged(i)
841                 }
842         }
843 }
844
845 func (t *Torrent) byteRegionPieces(off, size int64) (begin, end int) {
846         if off >= t.length {
847                 return
848         }
849         if off < 0 {
850                 size += off
851                 off = 0
852         }
853         if size <= 0 {
854                 return
855         }
856         begin = int(off / t.info.PieceLength)
857         end = int((off + size + t.info.PieceLength - 1) / t.info.PieceLength)
858         if end > t.info.NumPieces() {
859                 end = t.info.NumPieces()
860         }
861         return
862 }
863
864 // Returns true if all iterations complete without breaking.
865 func (t *Torrent) forReaderOffsetPieces(f func(begin, end int) (more bool)) (all bool) {
866         // There's an oppurtunity here to build a map of beginning pieces, and a
867         // bitmap of the rest. I wonder if it's worth the allocation overhead.
868         for r := range t.readers {
869                 r.mu.Lock()
870                 pos, readahead := r.pos, r.readahead
871                 r.mu.Unlock()
872                 if readahead < 1 {
873                         readahead = 1
874                 }
875                 begin, end := t.byteRegionPieces(pos, readahead)
876                 if begin >= end {
877                         continue
878                 }
879                 if !f(begin, end) {
880                         return false
881                 }
882         }
883         return true
884 }
885
886 func (t *Torrent) piecePriority(piece int) piecePriority {
887         if !t.haveInfo() {
888                 return PiecePriorityNone
889         }
890         return t.pieces[piece].priority
891 }
892
893 func (t *Torrent) piecePriorityUncached(piece int) (ret piecePriority) {
894         ret = PiecePriorityNone
895         if t.pieceComplete(piece) {
896                 return
897         }
898         if t.pendingPieces.Contains(piece) {
899                 ret = PiecePriorityNormal
900         }
901         raiseRet := ret.Raise
902         t.forReaderOffsetPieces(func(begin, end int) (again bool) {
903                 if piece == begin {
904                         raiseRet(PiecePriorityNow)
905                 }
906                 if begin <= piece && piece < end {
907                         raiseRet(PiecePriorityReadahead)
908                 }
909                 return true
910         })
911         return
912 }
913
914 func (t *Torrent) pendPiece(piece int) {
915         if t.pendingPieces.Contains(piece) {
916                 return
917         }
918         if t.havePiece(piece) {
919                 return
920         }
921         t.pendingPieces.Add(piece)
922         if !t.updatePiecePriority(piece) {
923                 return
924         }
925         t.piecePriorityChanged(piece)
926 }
927
928 func (t *Torrent) getCompletedPieces() (ret bitmap.Bitmap) {
929         return t.completedPieces.Copy()
930 }
931
932 func (t *Torrent) unpendPieces(unpend *bitmap.Bitmap) {
933         t.pendingPieces.Sub(unpend)
934         t.updatePiecePriorities()
935 }
936
937 func (t *Torrent) pendPieceRange(begin, end int) {
938         for i := begin; i < end; i++ {
939                 t.pendPiece(i)
940         }
941 }
942
943 func (t *Torrent) unpendPieceRange(begin, end int) {
944         var bm bitmap.Bitmap
945         bm.AddRange(begin, end)
946         t.unpendPieces(&bm)
947 }
948
949 func (t *Torrent) connRequestPiecePendingChunks(c *connection, piece int) (more bool) {
950         if !c.PeerHasPiece(piece) {
951                 return true
952         }
953         chunkIndices := t.pieces[piece].undirtiedChunkIndices().ToSortedSlice()
954         return itertools.ForPerm(len(chunkIndices), func(i int) bool {
955                 req := request{pp.Integer(piece), t.chunkIndexSpec(chunkIndices[i], piece)}
956                 return c.Request(req)
957         })
958 }
959
960 func (t *Torrent) pendRequest(req request) {
961         ci := chunkIndex(req.chunkSpec, t.chunkSize)
962         t.pieces[req.Index].pendChunkIndex(ci)
963 }
964
965 func (t *Torrent) pieceChanged(piece int) {
966         t.cl.pieceChanged(t, piece)
967 }
968
969 func (t *Torrent) openNewConns() {
970         t.cl.openNewConns(t)
971 }
972
973 func (t *Torrent) getConnPieceInclination() []int {
974         _ret := t.connPieceInclinationPool.Get()
975         if _ret == nil {
976                 pieceInclinationsNew.Add(1)
977                 return rand.Perm(t.numPieces())
978         }
979         pieceInclinationsReused.Add(1)
980         return _ret.([]int)
981 }
982
983 func (t *Torrent) putPieceInclination(pi []int) {
984         t.connPieceInclinationPool.Put(pi)
985         pieceInclinationsPut.Add(1)
986 }
987
988 func (t *Torrent) updatePieceCompletion(piece int) {
989         pcu := t.pieceCompleteUncached(piece)
990         changed := t.completedPieces.Get(piece) != pcu
991         t.completedPieces.Set(piece, pcu)
992         if changed {
993                 t.pieceChanged(piece)
994         }
995 }
996
997 // Non-blocking read. Client lock is not required.
998 func (t *Torrent) readAt(b []byte, off int64) (n int, err error) {
999         p := &t.pieces[off/t.info.PieceLength]
1000         p.waitNoPendingWrites()
1001         return p.Storage().ReadAt(b, off-p.Info().Offset())
1002 }
1003
1004 func (t *Torrent) updateAllPieceCompletions() {
1005         for i := range iter.N(t.numPieces()) {
1006                 t.updatePieceCompletion(i)
1007         }
1008 }
1009
1010 func (t *Torrent) maybeMetadataCompleted() {
1011         if t.haveInfo() {
1012                 // Nothing to do.
1013                 return
1014         }
1015         if !t.haveAllMetadataPieces() {
1016                 // Don't have enough metadata pieces.
1017                 return
1018         }
1019         // TODO(anacrolix): If this fails, I think something harsher should be
1020         // done.
1021         err := t.setInfoBytes(t.metadataBytes)
1022         if err != nil {
1023                 log.Printf("error setting metadata: %s", err)
1024                 t.invalidateMetadata()
1025                 return
1026         }
1027         if t.cl.config.Debug {
1028                 log.Printf("%s: got metadata from peers", t)
1029         }
1030 }
1031
1032 func (t *Torrent) readerPieces() (ret bitmap.Bitmap) {
1033         t.forReaderOffsetPieces(func(begin, end int) bool {
1034                 ret.AddRange(begin, end)
1035                 return true
1036         })
1037         return
1038 }
1039
1040 func (t *Torrent) needData() bool {
1041         if !t.haveInfo() {
1042                 return true
1043         }
1044         if t.pendingPieces.Len() != 0 {
1045                 return true
1046         }
1047         return !t.readerPieces().IterTyped(func(piece int) bool {
1048                 return t.pieceComplete(piece)
1049         })
1050 }
1051
1052 func (t *Torrent) addTrackers(announceList [][]string) {
1053         newTrackers := copyTrackers(t.trackers)
1054         for tierIndex, tier := range announceList {
1055                 if tierIndex < len(newTrackers) {
1056                         newTrackers[tierIndex] = mergeTier(newTrackers[tierIndex], tier)
1057                 } else {
1058                         newTrackers = append(newTrackers, mergeTier(nil, tier))
1059                 }
1060                 shuffleTier(newTrackers[tierIndex])
1061         }
1062         t.trackers = newTrackers
1063 }
1064
1065 // Don't call this before the info is available.
1066 func (t *Torrent) bytesCompleted() int64 {
1067         if !t.haveInfo() {
1068                 return 0
1069         }
1070         return t.info.TotalLength() - t.bytesLeft()
1071 }
1072
1073 func (t *Torrent) SetInfoBytes(b []byte) (err error) {
1074         t.cl.mu.Lock()
1075         defer t.cl.mu.Unlock()
1076         if t.info != nil {
1077                 return
1078         }
1079         return t.setInfoBytes(b)
1080 }
1081
1082 // Returns true if connection is removed from torrent.Conns.
1083 func (t *Torrent) deleteConnection(c *connection) bool {
1084         for i0, _c := range t.conns {
1085                 if _c != c {
1086                         continue
1087                 }
1088                 i1 := len(t.conns) - 1
1089                 if i0 != i1 {
1090                         t.conns[i0] = t.conns[i1]
1091                 }
1092                 t.conns = t.conns[:i1]
1093                 return true
1094         }
1095         return false
1096 }
1097
1098 func (t *Torrent) dropConnection(c *connection) {
1099         t.cl.event.Broadcast()
1100         c.Close()
1101         if t.deleteConnection(c) {
1102                 t.openNewConns()
1103         }
1104 }
1105
1106 // Returns true when peers are required, or false if the torrent is closing.
1107 func (t *Torrent) waitWantPeers() bool {
1108         t.cl.mu.Lock()
1109         defer t.cl.mu.Unlock()
1110         for {
1111                 if t.closed.IsSet() {
1112                         return false
1113                 }
1114                 if len(t.peers) > torrentPeersLowWater {
1115                         goto wait
1116                 }
1117                 if t.needData() || t.seeding() {
1118                         return true
1119                 }
1120         wait:
1121                 t.wantPeers.Wait()
1122         }
1123 }
1124
1125 // Returns whether the client should make effort to seed the torrent.
1126 func (t *Torrent) seeding() bool {
1127         cl := t.cl
1128         if cl.config.NoUpload {
1129                 return false
1130         }
1131         if !cl.config.Seed {
1132                 return false
1133         }
1134         if t.needData() {
1135                 return false
1136         }
1137         return true
1138 }
1139
1140 // Announce torrent to its trackers.
1141 func (t *Torrent) announceTrackers() {
1142         cl := t.cl
1143         req := tracker.AnnounceRequest{
1144                 Event:    tracker.Started,
1145                 NumWant:  -1,
1146                 Port:     uint16(cl.incomingPeerPort()),
1147                 PeerId:   cl.peerID,
1148                 InfoHash: t.infoHash,
1149         }
1150         if !t.waitWantPeers() {
1151                 return
1152         }
1153         cl.mu.RLock()
1154         req.Left = t.bytesLeftAnnounce()
1155         trackers := t.trackers
1156         cl.mu.RUnlock()
1157         if t.announceTrackersFastStart(&req, trackers) {
1158                 req.Event = tracker.None
1159         }
1160 newAnnounce:
1161         for t.waitWantPeers() {
1162                 cl.mu.RLock()
1163                 req.Left = t.bytesLeftAnnounce()
1164                 trackers = t.trackers
1165                 cl.mu.RUnlock()
1166                 numTrackersTried := 0
1167                 for _, tier := range trackers {
1168                         for trIndex, tr := range tier {
1169                                 numTrackersTried++
1170                                 interval, err := t.announceSingleTracker(tr, &req)
1171                                 if err != nil {
1172                                         // Try the next tracker.
1173                                         continue
1174                                 }
1175                                 // Float the successful announce to the top of the tier. If
1176                                 // the trackers list has been changed, we'll be modifying an
1177                                 // old copy so it won't matter.
1178                                 cl.mu.Lock()
1179                                 tier[0], tier[trIndex] = tier[trIndex], tier[0]
1180                                 cl.mu.Unlock()
1181
1182                                 req.Event = tracker.None
1183                                 // Wait the interval before attempting another announce.
1184                                 time.Sleep(interval)
1185                                 continue newAnnounce
1186                         }
1187                 }
1188                 if numTrackersTried != 0 {
1189                         log.Printf("%s: all trackers failed", t)
1190                 }
1191                 // TODO: Wait until trackers are added if there are none.
1192                 time.Sleep(10 * time.Second)
1193         }
1194 }
1195
1196 func (t *Torrent) announceTrackersFastStart(req *tracker.AnnounceRequest, trackers []trackerTier) (atLeastOne bool) {
1197         oks := make(chan bool)
1198         outstanding := 0
1199         for _, tier := range trackers {
1200                 for _, tr := range tier {
1201                         outstanding++
1202                         go func(tr string) {
1203                                 _, err := t.announceSingleTracker(tr, req)
1204                                 oks <- err == nil
1205                         }(tr)
1206                 }
1207         }
1208         for outstanding > 0 {
1209                 ok := <-oks
1210                 outstanding--
1211                 if ok {
1212                         atLeastOne = true
1213                 }
1214         }
1215         return
1216 }
1217
1218 func (t *Torrent) announceSingleTracker(tr string, req *tracker.AnnounceRequest) (interval time.Duration, err error) {
1219         blocked, err := t.cl.trackerBlockedUnlocked(tr)
1220         if err != nil {
1221                 err = fmt.Errorf("error determining if tracker blocked: %s", err)
1222                 return
1223         }
1224         if blocked {
1225                 err = errors.New("tracker has blocked IP")
1226                 return
1227         }
1228         resp, err := tracker.Announce(tr, req)
1229         if err != nil {
1230                 return
1231         }
1232         var peers []Peer
1233         for _, peer := range resp.Peers {
1234                 peers = append(peers, Peer{
1235                         IP:   peer.IP,
1236                         Port: peer.Port,
1237                 })
1238         }
1239         t.AddPeers(peers)
1240         interval = time.Second * time.Duration(resp.Interval)
1241         return
1242 }