19 metainfo "github.com/nsf/libtorgo/torrent"
21 "bitbucket.org/anacrolix/go.torrent/peer_protocol"
22 "bitbucket.org/anacrolix/go.torrent/tracker"
23 _ "bitbucket.org/anacrolix/go.torrent/tracker/udp"
26 // Currently doesn't really queue, but should in the future.
27 func (cl *Client) queuePieceCheck(t *Torrent, pieceIndex peer_protocol.Integer) {
28 piece := t.Pieces[pieceIndex]
29 if piece.QueuedForHash {
32 piece.QueuedForHash = true
33 go cl.verifyPiece(t, pieceIndex)
36 func (cl *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) {
40 newPriorities := make([]Request, 0, (len_+2*(chunkSize-1))/chunkSize)
42 // TODO: Write a function to return the Request for a given offset.
43 req, ok := t.offsetRequest(off)
47 off += int64(req.Length)
48 len_ -= int64(req.Length)
49 if _, ok = t.Pieces[req.Index].PendingChunkSpecs[req.ChunkSpec]; !ok {
52 newPriorities = append(newPriorities, req)
54 if len(newPriorities) == 0 {
57 if t.Priorities == nil {
58 t.Priorities = list.New()
60 t.Priorities.PushFront(newPriorities[0])
61 for _, req := range newPriorities[1:] {
62 t.Priorities.PushBack(req)
64 for _, cn := range t.Conns {
65 cl.replenishConnRequests(t, cn)
69 type DataSpec struct {
87 torrents map[InfoHash]*Torrent
88 dataWaiter chan struct{}
91 func (cl *Client) TorrentReadAt(ih InfoHash, off int64, p []byte) (n int, err error) {
96 err = errors.New("unknown torrent")
99 index := peer_protocol.Integer(off / t.MetaInfo.PieceLength)
100 // Reading outside the bounds of a file is an error.
105 if int(index) >= len(t.Pieces) {
109 piece := t.Pieces[index]
110 if !piece.EverHashed {
111 cl.queuePieceCheck(t, index)
114 err = ErrDataNotReady
117 pieceOff := peer_protocol.Integer(off % int64(t.PieceLength(0)))
118 high := int(t.PieceLength(index) - pieceOff)
122 for cs, _ := range piece.PendingChunkSpecs {
123 chunkOff := int64(pieceOff) - int64(cs.Begin)
124 if chunkOff >= int64(t.PieceLength(index)) {
127 if 0 <= chunkOff && chunkOff < int64(cs.Length) {
128 // read begins in a pending chunk
129 err = ErrDataNotReady
132 // pending chunk caps available data
133 if chunkOff < 0 && int64(len(p)) > -chunkOff {
137 return t.Data.ReadAt(p, off)
140 func (c *Client) Start() {
143 c.torrents = make(map[InfoHash]*Torrent)
144 if c.HalfOpenLimit == 0 {
147 o := copy(c.PeerId[:], BEP20)
148 _, err := rand.Read(c.PeerId[o:])
150 panic("error generating peer id")
152 c.quit = make(chan struct{})
153 if c.Listener != nil {
154 go c.acceptConnections()
158 func (cl *Client) stopped() bool {
167 func (me *Client) Stop() {
171 for _, t := range me.torrents {
172 for _, c := range t.Conns {
179 func (cl *Client) acceptConnections() {
181 conn, err := cl.Listener.Accept()
192 if err := cl.runConnection(conn, nil); err != nil {
199 func (me *Client) torrent(ih InfoHash) *Torrent {
200 for _, t := range me.torrents {
201 if t.InfoHash == ih {
208 func (me *Client) initiateConn(peer Peer, torrent *Torrent) {
209 if peer.Id == me.PeerId {
214 addr := &net.TCPAddr{
218 conn, err := net.DialTimeout(addr.Network(), addr.String(), dialTimeout)
223 if me.halfOpen == 0 {
230 if netOpErr, ok := err.(*net.OpError); ok {
231 if netOpErr.Timeout() {
234 switch netOpErr.Err {
235 case syscall.ECONNREFUSED:
240 log.Printf("error connecting to peer: %s %#v", err, err)
243 log.Printf("connected to %s", conn.RemoteAddr())
244 err = me.runConnection(conn, torrent)
251 func (me *Client) runConnection(sock net.Conn, torrent *Torrent) (err error) {
256 write: make(chan []byte),
257 post: make(chan encoding.BinaryMarshaler),
260 // There's a lock and deferred unlock later in this function. The
261 // client will not be locked when this deferred is invoked.
267 go conn.writeOptimizer()
268 conn.post <- peer_protocol.Bytes(peer_protocol.Protocol)
269 conn.post <- peer_protocol.Bytes("\x00\x00\x00\x00\x00\x00\x00\x00")
271 conn.post <- peer_protocol.Bytes(torrent.InfoHash[:])
272 conn.post <- peer_protocol.Bytes(me.PeerId[:])
275 _, err = io.ReadFull(conn.Socket, b[:])
280 err = fmt.Errorf("when reading protocol and extensions: %s", err)
283 if string(b[:20]) != peer_protocol.Protocol {
284 err = fmt.Errorf("wrong protocol: %#v", string(b[:20]))
287 if 8 != copy(conn.PeerExtensions[:], b[20:]) {
290 // log.Printf("peer extensions: %#v", string(conn.PeerExtensions[:]))
291 var infoHash [20]byte
292 _, err = io.ReadFull(conn.Socket, infoHash[:])
294 return fmt.Errorf("reading peer info hash: %s", err)
296 _, err = io.ReadFull(conn.Socket, conn.PeerId[:])
298 return fmt.Errorf("reading peer id: %s", err)
301 torrent = me.torrent(infoHash)
305 conn.post <- peer_protocol.Bytes(torrent.InfoHash[:])
306 conn.post <- peer_protocol.Bytes(me.PeerId[:])
310 if !me.addConnection(torrent, conn) {
313 if torrent.haveAnyPieces() {
314 conn.Post(peer_protocol.Message{
315 Type: peer_protocol.Bitfield,
316 Bitfield: torrent.bitfield(),
319 err = me.connectionLoop(torrent, conn)
321 err = fmt.Errorf("during Connection loop: %s", err)
323 me.dropConnection(torrent, conn)
327 func (me *Client) peerGotPiece(torrent *Torrent, conn *Connection, piece int) {
328 if conn.PeerPieces == nil {
329 conn.PeerPieces = make([]bool, len(torrent.Pieces))
331 conn.PeerPieces[piece] = true
332 if torrent.wantPiece(piece) {
333 me.replenishConnRequests(torrent, conn)
337 func (me *Client) peerUnchoked(torrent *Torrent, conn *Connection) {
338 me.replenishConnRequests(torrent, conn)
341 func (me *Client) connectionLoop(torrent *Torrent, conn *Connection) error {
342 decoder := peer_protocol.Decoder{
343 R: bufio.NewReader(conn.Socket),
344 MaxLength: 256 * 1024,
348 // TODO: Can this be allocated on the stack?
349 msg := new(peer_protocol.Message)
350 err := decoder.Decode(msg)
353 if me.stopped() || err == io.EOF {
362 case peer_protocol.Choke:
363 conn.PeerChoked = true
365 case peer_protocol.Unchoke:
366 conn.PeerChoked = false
367 me.peerUnchoked(torrent, conn)
368 case peer_protocol.Interested:
369 conn.PeerInterested = true
370 // TODO: This should be done from a dedicated unchoking routine.
372 case peer_protocol.NotInterested:
373 conn.PeerInterested = false
374 case peer_protocol.Have:
375 me.peerGotPiece(torrent, conn, int(msg.Index))
376 case peer_protocol.Request:
377 if conn.PeerRequests == nil {
378 conn.PeerRequests = make(map[Request]struct{}, maxRequests)
382 ChunkSpec: ChunkSpec{msg.Begin, msg.Length},
384 conn.PeerRequests[request] = struct{}{}
385 // TODO: Requests should be satisfied from a dedicated upload routine.
386 p := make([]byte, msg.Length)
387 n, err := torrent.Data.ReadAt(p, int64(torrent.PieceLength(0))*int64(msg.Index)+int64(msg.Begin))
389 return fmt.Errorf("reading torrent data to serve request %s: %s", request, err)
391 if n != int(msg.Length) {
392 return fmt.Errorf("bad request: %s", msg)
394 conn.Post(peer_protocol.Message{
395 Type: peer_protocol.Piece,
400 case peer_protocol.Bitfield:
401 if len(msg.Bitfield) < len(torrent.Pieces) {
402 err = errors.New("received invalid bitfield")
405 if conn.PeerPieces != nil {
406 err = errors.New("received unexpected bitfield")
409 conn.PeerPieces = msg.Bitfield[:len(torrent.Pieces)]
410 for index, has := range conn.PeerPieces {
412 me.peerGotPiece(torrent, conn, index)
415 case peer_protocol.Piece:
416 request_ := Request{msg.Index, ChunkSpec{msg.Begin, peer_protocol.Integer(len(msg.Piece))}}
417 if _, ok := conn.Requests[request_]; !ok {
418 err = fmt.Errorf("unexpected piece: %s", request_)
421 delete(conn.Requests, request_)
422 err = me.downloadedChunk(torrent, msg)
424 log.Printf("received unknown message type: %#v", msg.Type)
429 me.replenishConnRequests(torrent, conn)
433 func (me *Client) dropConnection(torrent *Torrent, conn *Connection) {
435 for i0, c := range torrent.Conns {
439 i1 := len(torrent.Conns) - 1
441 torrent.Conns[i0] = torrent.Conns[i1]
443 torrent.Conns = torrent.Conns[:i1]
446 panic("no such Connection")
449 func (me *Client) addConnection(t *Torrent, c *Connection) bool {
450 for _, c0 := range t.Conns {
451 if c.PeerId == c0.PeerId {
452 log.Printf("%s and %s have the same ID: %s", c.Socket.RemoteAddr(), c0.Socket.RemoteAddr(), c.PeerId)
456 t.Conns = append(t.Conns, c)
460 func (me *Client) openNewConns() {
461 for _, t := range me.torrents {
462 for len(t.Peers) != 0 {
463 if me.halfOpen >= me.HalfOpenLimit {
467 t.Peers = t.Peers[1:]
468 me.initiateConn(p, t)
473 func (me *Client) AddPeers(infoHash InfoHash, peers []Peer) error {
475 t := me.torrent(infoHash)
477 return errors.New("no such torrent")
479 t.Peers = append(t.Peers, peers...)
485 // Prepare a Torrent without any attachment to a Client. That means we can
486 // initialize fields all fields that don't require the Client without locking
488 func newTorrent(metaInfo *metainfo.MetaInfo, dataDir string) (torrent *Torrent, err error) {
490 InfoHash: BytesInfoHash(metaInfo.InfoHash),
493 torrent.Data, err = mmapTorrentData(metaInfo, dataDir)
497 for offset := 0; offset < len(metaInfo.Pieces); offset += PieceHash.Size() {
498 hash := metaInfo.Pieces[offset : offset+PieceHash.Size()]
499 if len(hash) != PieceHash.Size() {
500 err = errors.New("bad piece hash in metainfo")
504 copyHashSum(piece.Hash[:], hash)
505 torrent.Pieces = append(torrent.Pieces, piece)
506 torrent.pendAllChunkSpecs(peer_protocol.Integer(len(torrent.Pieces) - 1))
508 torrent.Trackers = make([][]tracker.Client, len(metaInfo.AnnounceList))
509 for tierIndex := range metaInfo.AnnounceList {
510 tier := torrent.Trackers[tierIndex]
511 for _, url := range metaInfo.AnnounceList[tierIndex] {
512 tr, err := tracker.New(url)
517 tier = append(tier, tr)
519 // The trackers within each tier must be shuffled before use.
520 // http://stackoverflow.com/a/12267471/149482
521 // http://www.bittorrent.org/beps/bep_0012.html#order-of-processing
522 for i := range tier {
523 j := mathRand.Intn(i + 1)
524 tier[i], tier[j] = tier[j], tier[i]
526 torrent.Trackers[tierIndex] = tier
531 func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) error {
532 torrent, err := newTorrent(metaInfo, me.DataDir)
538 if _, ok := me.torrents[torrent.InfoHash]; ok {
539 return torrent.Close()
541 me.torrents[torrent.InfoHash] = torrent
542 if !me.DisableTrackers {
543 go me.announceTorrent(torrent)
545 for i := range torrent.Pieces {
546 me.queuePieceCheck(torrent, peer_protocol.Integer(i))
551 func (cl *Client) listenerAnnouncePort() (port int16) {
557 switch data := addr.(type) {
559 return int16(data.Port)
561 return int16(data.Port)
563 log.Printf("unknown listener addr type: %T", addr)
568 func (cl *Client) announceTorrent(t *Torrent) {
569 req := tracker.AnnounceRequest{
570 Event: tracker.Started,
572 Port: cl.listenerAnnouncePort(),
574 req.PeerId = cl.PeerId
575 req.InfoHash = t.InfoHash
578 for _, tier := range t.Trackers {
579 for trIndex, tr := range tier {
580 if err := tr.Connect(); err != nil {
584 resp, err := tr.Announce(&req)
590 for _, peer := range resp.Peers {
591 peers = append(peers, Peer{
596 if err := cl.AddPeers(t.InfoHash, peers); err != nil {
600 log.Printf("%d new peers from %s", len(peers), "TODO")
601 tier[0], tier[trIndex] = tier[trIndex], tier[0]
602 time.Sleep(time.Second * time.Duration(resp.Interval))
606 time.Sleep(time.Second)
610 func (cl *Client) allTorrentsCompleted() bool {
611 for _, t := range cl.torrents {
612 if !t.haveAllPieces() {
619 func (me *Client) WaitAll() {
621 for !me.allTorrentsCompleted() {
627 func (me *Client) replenishConnRequests(torrent *Torrent, conn *Connection) {
628 requestHeatMap := torrent.requestHeat()
629 addRequest := func(req Request) (again bool) {
630 piece := torrent.Pieces[req.Index]
632 // We can't be sure we want this.
635 if piece.Complete() {
636 // We already have this.
639 if requestHeatMap[req] > 0 {
640 // We've already requested this.
643 return conn.Request(req)
645 // First request prioritized chunks.
646 if torrent.Priorities != nil {
647 for e := torrent.Priorities.Front(); e != nil; e = e.Next() {
648 if !addRequest(e.Value.(Request)) {
653 // Then finish off incomplete pieces in order of bytes remaining.
654 for _, index := range torrent.piecesByPendingBytesDesc() {
655 if torrent.PieceNumPendingBytes(index) == torrent.PieceLength(index) {
658 for chunkSpec := range torrent.Pieces[index].PendingChunkSpecs {
659 if !addRequest(Request{index, chunkSpec}) {
664 if len(conn.Requests) == 0 {
665 conn.SetInterested(false)
669 func (me *Client) downloadedChunk(torrent *Torrent, msg *peer_protocol.Message) (err error) {
670 request := Request{msg.Index, ChunkSpec{msg.Begin, peer_protocol.Integer(len(msg.Piece))}}
671 if _, ok := torrent.Pieces[request.Index].PendingChunkSpecs[request.ChunkSpec]; !ok {
672 log.Printf("got unnecessary chunk: %s", request)
675 err = torrent.WriteChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
679 delete(torrent.Pieces[request.Index].PendingChunkSpecs, request.ChunkSpec)
680 if len(torrent.Pieces[request.Index].PendingChunkSpecs) == 0 {
681 me.queuePieceCheck(torrent, request.Index)
683 var next *list.Element
684 for e := torrent.Priorities.Front(); e != nil; e = next {
686 if e.Value.(Request) == request {
687 torrent.Priorities.Remove(e)
690 me.dataReady(DataSpec{torrent.InfoHash, request})
694 func (cl *Client) dataReady(ds DataSpec) {
695 if cl.dataWaiter != nil {
701 func (cl *Client) DataWaiter() <-chan struct{} {
704 if cl.dataWaiter == nil {
705 cl.dataWaiter = make(chan struct{})
710 func (me *Client) pieceHashed(t *Torrent, piece peer_protocol.Integer, correct bool) {
714 p.PendingChunkSpecs = nil
715 log.Printf("got piece %d, (%d/%d)", piece, t.NumPiecesCompleted(), t.NumPieces())
716 var next *list.Element
717 if t.Priorities != nil {
718 for e := t.Priorities.Front(); e != nil; e = next {
720 if e.Value.(Request).Index == piece {
721 t.Priorities.Remove(e)
725 me.dataReady(DataSpec{
728 peer_protocol.Integer(piece),
729 ChunkSpec{0, peer_protocol.Integer(t.PieceLength(piece))},
733 if len(p.PendingChunkSpecs) == 0 {
734 t.pendAllChunkSpecs(piece)
737 for _, conn := range t.Conns {
739 conn.Post(peer_protocol.Message{
740 Type: peer_protocol.Have,
741 Index: peer_protocol.Integer(piece),
743 // TODO: Cancel requests for this piece.
745 if conn.PeerHasPiece(piece) {
746 me.replenishConnRequests(t, conn)
753 func (cl *Client) verifyPiece(t *Torrent, index peer_protocol.Integer) {
760 p.QueuedForHash = false
762 sum := t.HashPiece(index)
765 cl.pieceHashed(t, index, sum == p.Hash)
769 func (me *Client) Torrents() (ret []*Torrent) {
771 for _, t := range me.torrents {