4 "bitbucket.org/anacrolix/go.torrent/peer_protocol"
11 metainfo "github.com/nsf/libtorgo/torrent"
13 "launchpad.net/gommap"
21 PieceHash = crypto.SHA1
23 chunkSize = 0x4000 // 16KiB
26 type InfoHash [20]byte
28 type pieceSum [20]byte
30 func copyHashSum(dst, src []byte) {
31 if len(dst) != len(src) || copy(dst, src) != len(dst) {
32 panic("hash sum sizes differ")
36 func BytesInfoHash(b []byte) (ih InfoHash) {
37 if len(b) != len(ih) || copy(ih[:], b) != len(ih) {
38 panic("bad infohash bytes")
46 pieceStateUnknown = iota
54 PendingChunkSpecs map[chunkSpec]struct{}
57 type chunkSpec struct {
58 Begin, Length peer_protocol.Integer
62 Index peer_protocol.Integer
66 type connection struct {
68 post chan encoding.BinaryMarshaler
73 Requests map[request]struct{}
78 PeerRequests map[request]struct{}
79 PeerExtensions [8]byte
83 func (c *connection) PeerHasPiece(index int) bool {
84 if c.PeerPieces == nil {
87 return c.PeerPieces[index]
90 func (c *connection) Post(msg encoding.BinaryMarshaler) {
94 func (c *connection) Request(chunk request) bool {
95 if len(c.Requests) >= maxRequests {
98 if _, ok := c.Requests[chunk]; !ok {
99 c.Post(peer_protocol.Message{
100 Type: peer_protocol.Request,
103 Length: chunk.Length,
106 if c.Requests == nil {
107 c.Requests = make(map[request]struct{}, maxRequests)
109 c.Requests[chunk] = struct{}{}
113 func (c *connection) SetInterested(interested bool) {
114 if c.Interested == interested {
117 c.Post(peer_protocol.Message{
118 Type: func() peer_protocol.MessageType {
120 return peer_protocol.Interested
122 return peer_protocol.NotInterested
126 c.Interested = interested
129 func (conn *connection) writer() {
132 n, err := conn.Socket.Write(b)
139 panic("didn't write all bytes")
141 log.Printf("wrote %#v", string(b))
145 func (conn *connection) writeOptimizer() {
146 pending := list.New()
150 if pending.Len() == 0 {
154 nextWrite, err = pending.Front().Value.(encoding.BinaryMarshaler).MarshalBinary()
160 case msg := <-conn.post:
161 pending.PushBack(msg)
162 case write <- nextWrite:
163 pending.Remove(pending.Front())
168 type torrent struct {
172 MetaInfo *metainfo.MetaInfo
177 func (t *torrent) bitfield() (bf []bool) {
178 for _, p := range t.Pieces {
179 bf = append(bf, p.State == pieceStateComplete)
184 func (t *torrent) pieceChunkSpecs(index int) (cs map[chunkSpec]struct{}) {
185 cs = make(map[chunkSpec]struct{}, (t.MetaInfo.PieceLength+chunkSize-1)/chunkSize)
189 for left := peer_protocol.Integer(t.PieceSize(index)); left > 0; left -= c.Length {
191 if c.Length > chunkSize {
200 func (t *torrent) requestHeat() (ret map[request]int) {
201 ret = make(map[request]int)
202 for _, conn := range t.Conns {
203 for req, _ := range conn.Requests {
216 func (t *torrent) PieceSize(piece int) (size int64) {
217 if piece == len(t.Pieces)-1 {
218 size = t.Data.Size() % t.MetaInfo.PieceLength
221 size = t.MetaInfo.PieceLength
226 func (t *torrent) PieceReader(piece int) io.Reader {
227 return io.NewSectionReader(t.Data, int64(piece)*t.MetaInfo.PieceLength, t.MetaInfo.PieceLength)
230 func (t *torrent) HashPiece(piece int) (ps pieceSum) {
231 hash := PieceHash.New()
232 n, err := io.Copy(hash, t.PieceReader(piece))
236 if n != t.PieceSize(piece) {
237 panic("hashed wrong number of bytes")
239 copyHashSum(ps[:], hash.Sum(nil))
243 // func (t *torrent) bitfield
251 torrents map[InfoHash]*torrent
253 noTorrents chan struct{}
254 addTorrent chan *torrent
255 torrentFinished chan InfoHash
256 actorTask chan func()
259 func NewClient(dataDir string) *client {
264 torrents: make(map[InfoHash]*torrent),
266 noTorrents: make(chan struct{}),
267 addTorrent: make(chan *torrent),
268 torrentFinished: make(chan InfoHash),
269 actorTask: make(chan func()),
271 _, err := rand.Read(c.PeerId[:])
273 panic("error generating peer id")
279 func mmapTorrentData(metaInfo *metainfo.MetaInfo, location string) (mms MMapSpan, err error) {
286 for _, miFile := range metaInfo.Files {
287 fileName := filepath.Join(append([]string{location, metaInfo.Name}, miFile.Path...)...)
288 err = os.MkdirAll(filepath.Dir(fileName), 0666)
293 file, err = os.OpenFile(fileName, os.O_CREATE|os.O_RDWR, 0666)
300 fi, err = file.Stat()
304 if fi.Size() < miFile.Length {
305 err = file.Truncate(miFile.Length)
311 mMap, err = gommap.MapRegion(file.Fd(), 0, miFile.Length, gommap.PROT_READ|gommap.PROT_WRITE, gommap.MAP_SHARED)
315 if int64(len(mMap)) != miFile.Length {
316 panic("mmap has wrong length")
318 mms = append(mms, MMap{mMap})
327 func (me *client) torrent(ih InfoHash) *torrent {
328 for _, t := range me.torrents {
329 if t.InfoHash == ih {
336 func (me *client) initiateConn(peer Peer, torrent *torrent) {
337 if peer.Id == me.PeerId {
342 conn, err := net.DialTCP("tcp", nil, &net.TCPAddr{
346 me.withContext(func() {
351 log.Printf("error connecting to peer: %s", err)
354 log.Printf("connected to %s", conn.RemoteAddr())
355 me.handshake(conn, torrent, peer.Id)
359 func (me *torrent) haveAnyPieces() bool {
360 for _, piece := range me.Pieces {
361 if piece.State == pieceStateComplete {
368 func (me *client) handshake(sock net.Conn, torrent *torrent, peerId [20]byte) {
373 write: make(chan []byte),
374 post: make(chan encoding.BinaryMarshaler),
377 go conn.writeOptimizer()
378 conn.post <- peer_protocol.Bytes(peer_protocol.Protocol)
379 conn.post <- peer_protocol.Bytes("\x00\x00\x00\x00\x00\x00\x00\x00")
381 conn.post <- peer_protocol.Bytes(torrent.InfoHash[:])
382 conn.post <- peer_protocol.Bytes(me.PeerId[:])
385 _, err := io.ReadFull(conn.Socket, b[:])
389 if string(b[:20]) != peer_protocol.Protocol {
390 log.Printf("wrong protocol: %#v", string(b[:20]))
393 if 8 != copy(conn.PeerExtensions[:], b[20:]) {
396 log.Printf("peer extensions: %#v", string(conn.PeerExtensions[:]))
397 var infoHash [20]byte
398 _, err = io.ReadFull(conn.Socket, infoHash[:])
402 _, err = io.ReadFull(conn.Socket, conn.PeerId[:])
407 torrent = me.torrent(infoHash)
411 conn.post <- peer_protocol.Bytes(torrent.InfoHash[:])
412 conn.post <- peer_protocol.Bytes(me.PeerId[:])
414 me.withContext(func() {
415 me.addConnection(torrent, conn)
416 if torrent.haveAnyPieces() {
417 conn.Post(peer_protocol.Message{
418 Type: peer_protocol.Bitfield,
419 Bitfield: torrent.bitfield(),
423 defer me.withContext(func() {
424 me.dropConnection(torrent, conn)
426 err := me.runConnection(torrent, conn)
434 func (me *client) peerGotPiece(torrent *torrent, conn *connection, piece int) {
435 if conn.PeerPieces == nil {
436 conn.PeerPieces = make([]bool, len(torrent.Pieces))
438 conn.PeerPieces[piece] = true
439 if torrent.wantPiece(piece) {
440 conn.SetInterested(true)
441 me.replenishConnRequests(torrent, conn)
445 func (t *torrent) wantPiece(index int) bool {
446 return t.Pieces[index].State == pieceStateIncomplete
449 func (me *client) peerUnchoked(torrent *torrent, conn *connection) {
450 me.replenishConnRequests(torrent, conn)
453 func (me *client) runConnection(torrent *torrent, conn *connection) error {
454 decoder := peer_protocol.Decoder{
455 R: bufio.NewReader(conn.Socket),
456 MaxLength: 256 * 1024,
459 msg := new(peer_protocol.Message)
460 err := decoder.Decode(msg)
467 go me.withContext(func() {
471 case peer_protocol.Choke:
472 conn.PeerChoked = true
473 case peer_protocol.Unchoke:
474 conn.PeerChoked = false
475 me.peerUnchoked(torrent, conn)
476 case peer_protocol.Interested:
477 conn.PeerInterested = true
478 case peer_protocol.NotInterested:
479 conn.PeerInterested = false
480 case peer_protocol.Have:
481 me.peerGotPiece(torrent, conn, int(msg.Index))
482 case peer_protocol.Request:
483 conn.PeerRequests[request{
485 chunkSpec: chunkSpec{msg.Begin, msg.Length},
487 case peer_protocol.Bitfield:
488 if len(msg.Bitfield) < len(torrent.Pieces) {
489 err = errors.New("received invalid bitfield")
492 if conn.PeerPieces != nil {
493 err = errors.New("received unexpected bitfield")
496 conn.PeerPieces = msg.Bitfield[:len(torrent.Pieces)]
497 for index, has := range conn.PeerPieces {
499 me.peerGotPiece(torrent, conn, index)
503 log.Printf("received unknown message type: %#v", msg.Type)
507 me.dropConnection(torrent, conn)
513 func (me *client) dropConnection(torrent *torrent, conn *connection) {
515 for i0, c := range torrent.Conns {
519 i1 := len(torrent.Conns) - 1
521 torrent.Conns[i0] = torrent.Conns[i1]
523 torrent.Conns = torrent.Conns[:i1]
526 panic("no such connection")
529 func (me *client) addConnection(t *torrent, c *connection) bool {
530 for _, c := range t.Conns {
531 if c.PeerId == c.PeerId {
535 t.Conns = append(t.Conns, c)
539 func (me *client) openNewConns() {
540 for _, t := range me.torrents {
541 for len(t.Peers) != 0 {
542 if me.halfOpen >= me.HalfOpenLimit {
546 t.Peers = t.Peers[1:]
547 me.initiateConn(p, t)
552 func (me *client) AddPeers(infoHash InfoHash, peers []Peer) (err error) {
553 me.withContext(func() {
554 t := me.torrent(infoHash)
556 err = errors.New("no such torrent")
559 t.Peers = append(t.Peers, peers...)
565 func (me *client) AddTorrent(metaInfo *metainfo.MetaInfo) error {
567 InfoHash: BytesInfoHash(metaInfo.InfoHash),
569 for offset := 0; offset < len(metaInfo.Pieces); offset += PieceHash.Size() {
570 hash := metaInfo.Pieces[offset : offset+PieceHash.Size()]
571 if len(hash) != PieceHash.Size() {
572 return errors.New("bad piece hash in metainfo")
575 copyHashSum(piece.Hash[:], hash)
576 torrent.Pieces = append(torrent.Pieces, piece)
579 torrent.Data, err = mmapTorrentData(metaInfo, me.DataDir)
583 torrent.MetaInfo = metaInfo
584 me.addTorrent <- torrent
588 func (me *client) WaitAll() {
592 func (me *client) Close() {
595 func (me *client) withContext(f func()) {
599 func (me *client) replenishConnRequests(torrent *torrent, conn *connection) {
600 if len(conn.Requests) >= maxRequests {
606 requestHeatMap := torrent.requestHeat()
607 for index, has := range conn.PeerPieces {
611 for chunkSpec, _ := range torrent.Pieces[index].PendingChunkSpecs {
612 request := request{peer_protocol.Integer(index), chunkSpec}
613 if heat := requestHeatMap[request]; heat > 0 {
616 conn.SetInterested(true)
617 if !conn.Request(request) {
622 //conn.SetInterested(false)
626 func (me *client) pieceHashed(ih InfoHash, piece int, correct bool) {
627 torrent := me.torrents[ih]
628 newState := func() pieceState {
630 return pieceStateComplete
632 return pieceStateIncomplete
635 oldState := torrent.Pieces[piece].State
636 if newState == oldState {
639 torrent.Pieces[piece].State = newState
640 if newState == pieceStateIncomplete {
641 torrent.Pieces[piece].PendingChunkSpecs = torrent.pieceChunkSpecs(piece)
643 for _, conn := range torrent.Conns {
645 conn.Post(peer_protocol.Message{
646 Type: peer_protocol.Have,
647 Index: peer_protocol.Integer(piece),
650 if conn.PeerHasPiece(piece) {
651 me.replenishConnRequests(torrent, conn)
658 func (me *client) run() {
660 noTorrents := me.noTorrents
661 if len(me.torrents) != 0 {
665 case noTorrents <- struct{}{}:
666 case torrent := <-me.addTorrent:
667 if _, ok := me.torrents[torrent.InfoHash]; ok {
670 me.torrents[torrent.InfoHash] = torrent
672 for _piece := range torrent.Pieces {
674 sum := torrent.HashPiece(piece)
675 me.withContext(func() {
676 me.pieceHashed(torrent.InfoHash, piece, sum == torrent.Pieces[piece].Hash)
680 case infoHash := <-me.torrentFinished:
681 delete(me.torrents, infoHash)
682 case task := <-me.actorTask: