2 Package torrent implements a torrent client.
9 if err := c.AddTorrent(externalMetaInfoPackageSux); err != nil {
10 return fmt.Errors("error adding torrent: %s", err)
13 log.Print("erhmahgerd, torrent downloaded")
33 metainfo "github.com/nsf/libtorgo/torrent"
35 pp "bitbucket.org/anacrolix/go.torrent/peer_protocol"
36 "bitbucket.org/anacrolix/go.torrent/tracker"
37 _ "bitbucket.org/anacrolix/go.torrent/tracker/udp"
40 // Currently doesn't really queue, but should in the future.
41 func (cl *Client) queuePieceCheck(t *torrent, pieceIndex pp.Integer) {
42 piece := t.Pieces[pieceIndex]
43 if piece.QueuedForHash {
46 piece.QueuedForHash = true
47 go cl.verifyPiece(t, pieceIndex)
50 // Queues the torrent data for the given region for download. The beginning of
51 // the region is given highest priority to allow a subsequent read at the same
52 // offset to return data ASAP.
53 func (me *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) error {
58 return errors.New("no such active torrent")
60 newPriorities := make([]request, 0, (len_+chunkSize-1)/chunkSize)
62 req, ok := t.offsetRequest(off)
64 return errors.New("bad offset")
66 reqOff := t.requestOffset(req)
67 // Gain the alignment adjustment.
69 // Lose the length of this block.
70 len_ -= int64(req.Length)
71 off = reqOff + int64(req.Length)
72 if !t.wantPiece(int(req.Index)) {
75 newPriorities = append(newPriorities, req)
77 if len(newPriorities) == 0 {
80 t.Priorities.PushFront(newPriorities[0])
81 for _, req := range newPriorities[1:] {
82 t.Priorities.PushBack(req)
84 for _, cn := range t.Conns {
85 me.replenishConnRequests(t, cn)
90 type dataSpec struct {
101 DownloadStrategy DownloadStrategy
108 torrents map[InfoHash]*torrent
109 dataWaiter chan struct{}
112 // Read torrent data at the given offset. Returns ErrDataNotReady if the data
114 func (cl *Client) TorrentReadAt(ih InfoHash, off int64, p []byte) (n int, err error) {
119 err = errors.New("unknown torrent")
122 index := pp.Integer(off / t.MetaInfo.PieceLength)
123 // Reading outside the bounds of a file is an error.
128 if int(index) >= len(t.Pieces) {
132 t.lastReadPiece = int(index)
133 piece := t.Pieces[index]
134 pieceOff := pp.Integer(off % int64(t.PieceLength(0)))
135 high := int(t.PieceLength(index) - pieceOff)
139 for cs, _ := range piece.PendingChunkSpecs {
140 chunkOff := int64(pieceOff) - int64(cs.Begin)
141 if chunkOff >= int64(t.PieceLength(index)) {
144 if 0 <= chunkOff && chunkOff < int64(cs.Length) {
145 // read begins in a pending chunk
146 err = ErrDataNotReady
149 // pending chunk caps available data
150 if chunkOff < 0 && int64(len(p)) > -chunkOff {
154 return t.Data.ReadAt(p, off)
157 // Starts the client. Defaults are applied. The client will begin accepting
158 // connections and tracking.
159 func (c *Client) Start() {
161 c.torrents = make(map[InfoHash]*torrent)
162 if c.HalfOpenLimit == 0 {
165 o := copy(c.PeerId[:], BEP20)
166 _, err := rand.Read(c.PeerId[o:])
168 panic("error generating peer id")
170 c.quit = make(chan struct{})
171 if c.DownloadStrategy == nil {
172 c.DownloadStrategy = &DefaultDownloadStrategy{}
174 if c.Listener != nil {
175 go c.acceptConnections()
179 func (cl *Client) stopped() bool {
188 // Stops the client. All connections to peers are closed and all activity will
190 func (me *Client) Stop() {
194 for _, t := range me.torrents {
195 for _, c := range t.Conns {
202 func (cl *Client) acceptConnections() {
204 conn, err := cl.Listener.Accept()
215 if err := cl.runConnection(conn, nil); err != nil {
222 func (me *Client) torrent(ih InfoHash) *torrent {
223 for _, t := range me.torrents {
224 if t.InfoHash == ih {
231 func (me *Client) initiateConn(peer Peer, torrent *torrent) {
232 if peer.Id == me.PeerId {
237 addr := &net.TCPAddr{
241 conn, err := net.DialTimeout(addr.Network(), addr.String(), dialTimeout)
246 if me.halfOpen == 0 {
253 if netOpErr, ok := err.(*net.OpError); ok {
254 if netOpErr.Timeout() {
257 switch netOpErr.Err {
258 case syscall.ECONNREFUSED, syscall.EHOSTUNREACH:
263 log.Printf("error connecting to peer: %s %#v", err, err)
266 log.Printf("connected to %s", conn.RemoteAddr())
267 err = me.runConnection(conn, torrent)
274 func (me *Client) runConnection(sock net.Conn, torrent *torrent) (err error) {
279 write: make(chan []byte),
280 post: make(chan pp.Message),
281 PeerMaxRequests: 250,
284 // There's a lock and deferred unlock later in this function. The
285 // client will not be locked when this deferred is invoked.
291 // go conn.writeOptimizer()
292 conn.write <- pp.Bytes(pp.Protocol)
293 conn.write <- pp.Bytes("\x00\x00\x00\x00\x00\x00\x00\x00")
295 conn.write <- pp.Bytes(torrent.InfoHash[:])
296 conn.write <- pp.Bytes(me.PeerId[:])
299 _, err = io.ReadFull(conn.Socket, b[:])
304 err = fmt.Errorf("when reading protocol and extensions: %s", err)
307 if string(b[:20]) != pp.Protocol {
308 err = fmt.Errorf("wrong protocol: %#v", string(b[:20]))
311 if 8 != copy(conn.PeerExtensions[:], b[20:]) {
314 // log.Printf("peer extensions: %#v", string(conn.PeerExtensions[:]))
315 var infoHash [20]byte
316 _, err = io.ReadFull(conn.Socket, infoHash[:])
318 return fmt.Errorf("reading peer info hash: %s", err)
320 _, err = io.ReadFull(conn.Socket, conn.PeerId[:])
322 return fmt.Errorf("reading peer id: %s", err)
325 torrent = me.torrent(infoHash)
329 conn.write <- pp.Bytes(torrent.InfoHash[:])
330 conn.write <- pp.Bytes(me.PeerId[:])
334 if !me.addConnection(torrent, conn) {
337 go conn.writeOptimizer(time.Minute)
338 if torrent.haveAnyPieces() {
339 conn.Post(pp.Message{
341 Bitfield: torrent.bitfield(),
344 err = me.connectionLoop(torrent, conn)
346 err = fmt.Errorf("during Connection loop: %s", err)
348 me.dropConnection(torrent, conn)
352 func (me *Client) peerGotPiece(torrent *torrent, conn *connection, piece int) {
353 if conn.PeerPieces == nil {
354 conn.PeerPieces = make([]bool, len(torrent.Pieces))
356 conn.PeerPieces[piece] = true
357 if torrent.wantPiece(piece) {
358 me.replenishConnRequests(torrent, conn)
362 func (me *Client) peerUnchoked(torrent *torrent, conn *connection) {
363 me.replenishConnRequests(torrent, conn)
366 func (cl *Client) connCancel(t *torrent, cn *connection, r request) (ok bool) {
369 cl.DownloadStrategy.DeleteRequest(t, r)
374 func (cl *Client) connDeleteRequest(t *torrent, cn *connection, r request) {
375 if !cn.RequestPending(r) {
378 cl.DownloadStrategy.DeleteRequest(t, r)
379 delete(cn.Requests, r)
382 func (me *Client) connectionLoop(t *torrent, c *connection) error {
383 decoder := pp.Decoder{
384 R: bufio.NewReader(c.Socket),
385 MaxLength: 256 * 1024,
390 err := decoder.Decode(&msg)
393 if me.stopped() || err == io.EOF {
404 for r := range c.Requests {
405 me.connDeleteRequest(t, c, r)
409 me.peerUnchoked(t, c)
411 c.PeerInterested = true
412 // TODO: This should be done from a dedicated unchoking routine.
414 case pp.NotInterested:
415 c.PeerInterested = false
418 me.peerGotPiece(t, c, int(msg.Index))
420 if c.PeerRequests == nil {
421 c.PeerRequests = make(map[request]struct{}, maxRequests)
423 request := newRequest(msg.Index, msg.Begin, msg.Length)
424 c.PeerRequests[request] = struct{}{}
425 // TODO: Requests should be satisfied from a dedicated upload routine.
426 p := make([]byte, msg.Length)
427 n, err := t.Data.ReadAt(p, int64(t.PieceLength(0))*int64(msg.Index)+int64(msg.Begin))
429 return fmt.Errorf("reading t data to serve request %s: %s", request, err)
431 if n != int(msg.Length) {
432 return fmt.Errorf("bad request: %s", msg)
441 req := newRequest(msg.Index, msg.Begin, msg.Length)
442 if !c.PeerCancel(req) {
443 log.Printf("received unexpected cancel: %v", req)
446 if len(msg.Bitfield) < len(t.Pieces) {
447 err = errors.New("received invalid bitfield")
450 if c.PeerPieces != nil {
451 err = errors.New("received unexpected bitfield")
454 c.PeerPieces = msg.Bitfield[:len(t.Pieces)]
455 for index, has := range c.PeerPieces {
457 me.peerGotPiece(t, c, index)
461 err = me.downloadedChunk(t, c, &msg)
463 err = fmt.Errorf("received unknown message type: %#v", msg.Type)
471 func (me *Client) dropConnection(torrent *torrent, conn *connection) {
473 for r := range conn.Requests {
474 me.connDeleteRequest(torrent, conn, r)
476 for i0, c := range torrent.Conns {
480 i1 := len(torrent.Conns) - 1
482 torrent.Conns[i0] = torrent.Conns[i1]
484 torrent.Conns = torrent.Conns[:i1]
487 panic("connection not found")
490 func (me *Client) addConnection(t *torrent, c *connection) bool {
491 for _, c0 := range t.Conns {
492 if c.PeerId == c0.PeerId {
493 // Already connected to a client with that ID.
497 t.Conns = append(t.Conns, c)
501 func (me *Client) openNewConns() {
502 for _, t := range me.torrents {
503 for len(t.Peers) != 0 {
504 if me.halfOpen >= me.HalfOpenLimit {
508 t.Peers = t.Peers[1:]
509 me.initiateConn(p, t)
514 // Adds peers to the swarm for the torrent corresponding to infoHash.
515 func (me *Client) AddPeers(infoHash InfoHash, peers []Peer) error {
517 t := me.torrent(infoHash)
519 return errors.New("no such torrent")
521 t.Peers = append(t.Peers, peers...)
527 // Prepare a Torrent without any attachment to a Client. That means we can
528 // initialize fields all fields that don't require the Client without locking
530 func newTorrent(metaInfo *metainfo.MetaInfo, dataDir string) (t *torrent, err error) {
532 InfoHash: BytesInfoHash(metaInfo.InfoHash),
535 t.Data, err = mmapTorrentData(metaInfo, dataDir)
539 for offset := 0; offset < len(metaInfo.Pieces); offset += pieceHash.Size() {
540 hash := metaInfo.Pieces[offset : offset+pieceHash.Size()]
541 if len(hash) != pieceHash.Size() {
542 err = errors.New("bad piece hash in metainfo")
546 copyHashSum(piece.Hash[:], hash)
547 t.Pieces = append(t.Pieces, piece)
548 t.pendAllChunkSpecs(pp.Integer(len(t.Pieces) - 1))
550 t.Trackers = make([][]tracker.Client, len(metaInfo.AnnounceList))
551 for tierIndex := range metaInfo.AnnounceList {
552 tier := t.Trackers[tierIndex]
553 for _, url := range metaInfo.AnnounceList[tierIndex] {
554 tr, err := tracker.New(url)
559 tier = append(tier, tr)
561 // The trackers within each tier must be shuffled before use.
562 // http://stackoverflow.com/a/12267471/149482
563 // http://www.bittorrent.org/beps/bep_0012.html#order-of-processing
564 for i := range tier {
565 j := mathRand.Intn(i + 1)
566 tier[i], tier[j] = tier[j], tier[i]
568 t.Trackers[tierIndex] = tier
573 // Adds the torrent to the client.
574 func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) error {
575 torrent, err := newTorrent(metaInfo, me.DataDir)
581 if _, ok := me.torrents[torrent.InfoHash]; ok {
582 return torrent.Close()
584 me.torrents[torrent.InfoHash] = torrent
585 me.DownloadStrategy.TorrentStarted(torrent)
586 if !me.DisableTrackers {
587 go me.announceTorrent(torrent)
589 torrent.Priorities = list.New()
591 // Queue all pieces for hashing. This is done sequentially to avoid
592 // spamming goroutines.
593 for _, p := range torrent.Pieces {
594 p.QueuedForHash = true
597 for i := range torrent.Pieces {
598 me.verifyPiece(torrent, pp.Integer(i))
605 func (cl *Client) listenerAnnouncePort() (port int16) {
611 switch data := addr.(type) {
613 return int16(data.Port)
615 return int16(data.Port)
617 log.Printf("unknown listener addr type: %T", addr)
622 func (cl *Client) announceTorrent(t *torrent) {
623 req := tracker.AnnounceRequest{
624 Event: tracker.Started,
626 Port: cl.listenerAnnouncePort(),
628 InfoHash: t.InfoHash,
633 req.Left = t.BytesLeft()
635 for _, tier := range t.Trackers {
636 for trIndex, tr := range tier {
637 if err := tr.Connect(); err != nil {
641 resp, err := tr.Announce(&req)
647 for _, peer := range resp.Peers {
648 peers = append(peers, Peer{
653 err = cl.AddPeers(t.InfoHash, peers)
657 log.Printf("%s: %d new peers from %s", t, len(peers), tr)
659 tier[0], tier[trIndex] = tier[trIndex], tier[0]
660 time.Sleep(time.Second * time.Duration(resp.Interval))
661 req.Event = tracker.None
665 time.Sleep(5 * time.Second)
669 func (cl *Client) allTorrentsCompleted() bool {
670 for _, t := range cl.torrents {
671 if !t.haveAllPieces() {
678 // Returns true when all torrents are completely downloaded and false if the
679 // client is stopped.
680 func (me *Client) WaitAll() bool {
683 for !me.allTorrentsCompleted() {
692 type DownloadStrategy interface {
693 FillRequests(t *torrent, c *connection)
694 TorrentStarted(t *torrent)
695 TorrentStopped(t *torrent)
696 DeleteRequest(t *torrent, r request)
699 type DefaultDownloadStrategy struct {
700 heat map[*torrent]map[request]int
703 func (cl *Client) assertRequestHeat() {
704 dds, ok := cl.DownloadStrategy.(*DefaultDownloadStrategy)
708 for _, t := range cl.torrents {
709 m := make(map[request]int, 3000)
710 for _, cn := range t.Conns {
711 for r := range cn.Requests {
715 for r, h := range dds.heat[t] {
717 panic(fmt.Sprintln(m[r], h))
723 func (s *DefaultDownloadStrategy) FillRequests(t *torrent, c *connection) {
725 addRequest := func(req request) (again bool) {
726 piece := t.Pieces[req.Index]
728 // We can't be sure we want this.
731 if piece.Complete() {
732 // We already have this.
735 if c.RequestPending(req) {
738 again = c.Request(req)
739 if c.RequestPending(req) {
744 // First request prioritized chunks.
745 for e := t.Priorities.Front(); e != nil; e = e.Next() {
746 if !addRequest(e.Value.(request)) {
750 ppbs := t.piecesByPendingBytes()
751 // Then finish off incomplete pieces in order of bytes remaining.
752 for _, heatThreshold := range []int{0, 4, 100} {
753 for _, pieceIndex := range ppbs {
754 for chunkSpec := range t.Pieces[pieceIndex].PendingChunkSpecs {
755 r := request{pieceIndex, chunkSpec}
756 if th[r] > heatThreshold {
759 if !addRequest(request{pieceIndex, chunkSpec}) {
767 func (s *DefaultDownloadStrategy) TorrentStarted(t *torrent) {
768 if s.heat[t] != nil {
769 panic("torrent already started")
772 s.heat = make(map[*torrent]map[request]int, 10)
774 s.heat[t] = make(map[request]int, t.ChunkCount())
777 func (s *DefaultDownloadStrategy) TorrentStopped(t *torrent) {
778 if _, ok := s.heat[t]; !ok {
779 panic("torrent not yet started")
784 func (s *DefaultDownloadStrategy) DeleteRequest(t *torrent, r request) {
787 panic("no pending requests")
792 type ResponsiveDownloadStrategy struct {
793 // How many bytes to preemptively download starting at the beginning of
794 // the last piece read for a given torrent.
798 func (ResponsiveDownloadStrategy) TorrentStarted(*torrent) {}
799 func (ResponsiveDownloadStrategy) TorrentStopped(*torrent) {}
800 func (ResponsiveDownloadStrategy) DeleteRequest(*torrent, request) {}
802 func (me *ResponsiveDownloadStrategy) FillRequests(t *torrent, c *connection) {
803 for e := t.Priorities.Front(); e != nil; e = e.Next() {
804 if !c.Request(e.Value.(request)) {
808 readaheadPieces := (me.Readahead + t.UsualPieceSize() - 1) / t.UsualPieceSize()
809 for i := t.lastReadPiece; i < t.lastReadPiece+readaheadPieces && i < t.NumPieces(); i++ {
810 for cs := range t.Pieces[i].PendingChunkSpecs {
811 if !c.Request(request{pp.Integer(i), cs}) {
816 // Then finish off incomplete pieces in order of bytes remaining.
817 for _, index := range t.piecesByPendingBytes() {
818 // Stop when we're onto untouched pieces.
819 if t.PieceNumPendingBytes(index) == t.PieceLength(index) {
822 for chunkSpec := range t.Pieces[index].PendingChunkSpecs {
823 if !c.Request(request{index, chunkSpec}) {
830 func (me *Client) replenishConnRequests(t *torrent, c *connection) {
831 me.DownloadStrategy.FillRequests(t, c)
832 if len(c.Requests) == 0 {
833 c.SetInterested(false)
837 func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) error {
838 req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
840 // Request has been satisfied.
841 me.connDeleteRequest(t, c, req)
843 defer me.replenishConnRequests(t, c)
845 // Do we actually want this chunk?
846 if _, ok := t.Pieces[req.Index].PendingChunkSpecs[req.chunkSpec]; !ok {
850 // Write the chunk out.
851 err := t.WriteChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
855 me.dataReady(dataSpec{t.InfoHash, req})
857 // Cancel pending requests for this chunk.
858 for _, c := range t.Conns {
859 if me.connCancel(t, c, req) {
860 me.replenishConnRequests(t, c)
864 // Record that we have the chunk.
865 delete(t.Pieces[req.Index].PendingChunkSpecs, req.chunkSpec)
866 if len(t.Pieces[req.Index].PendingChunkSpecs) == 0 {
867 me.queuePieceCheck(t, req.Index)
870 // Unprioritize the chunk.
871 var next *list.Element
872 for e := t.Priorities.Front(); e != nil; e = next {
874 if e.Value.(request) == req {
875 t.Priorities.Remove(e)
882 func (cl *Client) dataReady(ds dataSpec) {
883 if cl.dataWaiter != nil {
889 // Returns a channel that is closed when new data has become available in the
891 func (me *Client) DataWaiter() <-chan struct{} {
894 if me.dataWaiter == nil {
895 me.dataWaiter = make(chan struct{})
900 func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) {
904 p.PendingChunkSpecs = nil
905 log.Printf("%s: got piece %d, (%d/%d)", t, piece, t.NumPiecesCompleted(), t.NumPieces())
906 var next *list.Element
907 for e := t.Priorities.Front(); e != nil; e = next {
909 if e.Value.(request).Index == piece {
910 t.Priorities.Remove(e)
913 me.dataReady(dataSpec{
917 chunkSpec{0, pp.Integer(t.PieceLength(piece))},
921 if len(p.PendingChunkSpecs) == 0 {
922 t.pendAllChunkSpecs(piece)
925 for _, conn := range t.Conns {
927 conn.Post(pp.Message{
929 Index: pp.Integer(piece),
931 // TODO: Cancel requests for this piece.
933 if conn.PeerHasPiece(piece) {
934 me.replenishConnRequests(t, conn)
941 func (cl *Client) verifyPiece(t *torrent, index pp.Integer) {
948 p.QueuedForHash = false
950 sum := t.HashPiece(index)
953 cl.pieceHashed(t, index, sum == p.Hash)
957 func (me *Client) Torrents() (ret []*torrent) {
959 for _, t := range me.torrents {