"bufio"
"container/list"
"crypto/rand"
+ "crypto/sha1"
"errors"
"fmt"
+ "github.com/nsf/libtorgo/bencode"
"io"
"log"
mathRand "math/rand"
if t == nil {
return errors.New("no such active torrent")
}
+ if t.Info == nil {
+ return errors.New("missing metadata")
+ }
newPriorities := make([]request, 0, (len_+chunkSize-1)/chunkSize)
for len_ > 0 {
req, ok := t.offsetRequest(off)
cl.mu.Lock()
defer cl.mu.Unlock()
for _, t := range cl.torrents {
- fmt.Fprintf(w, "%s: %f%%\n", t.MetaInfo.Name, 100*(1-float32(t.BytesLeft())/float32(t.Length())))
+ fmt.Fprintf(w, "%s: %f%%\n", t.Name(), 100*(1-float32(t.BytesLeft())/float32(t.Length())))
t.WriteStatus(w)
}
}
err = errors.New("unknown torrent")
return
}
- index := pp.Integer(off / t.MetaInfo.PieceLength)
+ index := pp.Integer(off / t.Info.PieceLength())
// Reading outside the bounds of a file is an error.
if index < 0 {
err = os.ErrInvalid
PeerChoked: true,
write: make(chan []byte),
post: make(chan pp.Message),
- PeerMaxRequests: 250,
+ PeerMaxRequests: 64,
}
defer func() {
// There's a lock and deferred unlock later in this function. The
go conn.writer()
// go conn.writeOptimizer()
conn.write <- pp.Bytes(pp.Protocol)
- conn.write <- pp.Bytes("\x00\x00\x00\x00\x00\x00\x00\x00")
+ conn.write <- pp.Bytes("\x00\x00\x00\x00\x00\x10\x00\x00")
if torrent != nil {
conn.write <- pp.Bytes(torrent.InfoHash[:])
conn.write <- pp.Bytes(me.PeerId[:])
return
}
go conn.writeOptimizer(time.Minute)
+ if conn.PeerExtensions[5]&0x10 != 0 {
+ conn.Post(pp.Message{
+ Type: pp.Extended,
+ ExtendedID: pp.HandshakeExtendedID,
+ ExtendedPayload: func() []byte {
+ b, err := bencode.Marshal(map[string]interface{}{
+ "m": map[string]int{
+ "ut_metadata": 1,
+ },
+ })
+ if err != nil {
+ panic(err)
+ }
+ return b
+ }(),
+ })
+ }
if torrent.haveAnyPieces() {
conn.Post(pp.Message{
Type: pp.Bitfield,
return
}
-func (me *Client) peerGotPiece(torrent *torrent, conn *connection, piece int) {
- if conn.PeerPieces == nil {
- conn.PeerPieces = make([]bool, len(torrent.Pieces))
+func (me *Client) peerGotPiece(t *torrent, c *connection, piece int) {
+ for piece >= len(c.PeerPieces) {
+ c.PeerPieces = append(c.PeerPieces, false)
}
- conn.PeerPieces[piece] = true
- if torrent.wantPiece(piece) {
- me.replenishConnRequests(torrent, conn)
+ c.PeerPieces[piece] = true
+ if t.wantPiece(piece) {
+ me.replenishConnRequests(t, c)
}
}
delete(cn.Requests, r)
}
+func (cl *Client) requestPendingMetadata(t *torrent, c *connection) {
+ var pending []int
+ for index, have := range t.MetaDataHave {
+ if !have {
+ pending = append(pending, index)
+ }
+ }
+ for _, i := range mathRand.Perm(len(pending)) {
+ c.Post(pp.Message{
+ Type: pp.Extended,
+ ExtendedID: byte(c.PeerExtensionIDs["ut_metadata"]),
+ ExtendedPayload: func() []byte {
+ b, err := bencode.Marshal(map[string]int{
+ "msg_type": 0,
+ "piece": pending[i],
+ })
+ if err != nil {
+ panic(err)
+ }
+ return b
+ }(),
+ })
+ }
+}
+
func (me *Client) connectionLoop(t *torrent, c *connection) error {
decoder := pp.Decoder{
R: bufio.NewReader(c.Socket),
log.Printf("received unexpected cancel: %v", req)
}
case pp.Bitfield:
- if len(msg.Bitfield) < t.NumPieces() {
- err = errors.New("received invalid bitfield")
- break
- }
if c.PeerPieces != nil {
err = errors.New("received unexpected bitfield")
break
}
- c.PeerPieces = msg.Bitfield[:t.NumPieces()]
+ if t.haveInfo() {
+ if len(msg.Bitfield) < t.NumPieces() {
+ err = errors.New("received invalid bitfield")
+ break
+ }
+ msg.Bitfield = msg.Bitfield[:t.NumPieces()]
+ }
+ c.PeerPieces = msg.Bitfield
for index, has := range c.PeerPieces {
if has {
me.peerGotPiece(t, c, index)
}
case pp.Piece:
err = me.downloadedChunk(t, c, &msg)
+ case pp.Extended:
+ switch msg.ExtendedID {
+ case pp.HandshakeExtendedID:
+ var d map[string]interface{}
+ err = bencode.Unmarshal(msg.ExtendedPayload, &d)
+ if err != nil {
+ err = fmt.Errorf("error decoding extended message payload: %s", err)
+ break
+ }
+ m, ok := d["m"]
+ if !ok {
+ err = errors.New("handshake missing m item")
+ break
+ }
+ mTyped, ok := m.(map[string]interface{})
+ if !ok {
+ err = errors.New("handshake m value is not dict")
+ break
+ }
+ if c.PeerExtensionIDs == nil {
+ c.PeerExtensionIDs = make(map[string]int64, len(mTyped))
+ }
+ for name, v := range mTyped {
+ id, ok := v.(int64)
+ if !ok {
+ log.Printf("bad handshake m item extension ID type: %T", v)
+ continue
+ }
+ if id == 0 {
+ delete(c.PeerExtensionIDs, name)
+ } else {
+ c.PeerExtensionIDs[name] = id
+ }
+ }
+ metadata_sizeUntyped, ok := d["metadata_size"]
+ if ok {
+ metadata_size, ok := metadata_sizeUntyped.(int64)
+ if !ok {
+ log.Printf("bad metadata_size type: %T", metadata_sizeUntyped)
+ } else {
+ log.Printf("metadata_size: %d", metadata_size)
+ t.SetMetaDataSize(metadata_size)
+ }
+ }
+ if _, ok := c.PeerExtensionIDs["ut_metadata"]; ok {
+ me.requestPendingMetadata(t, c)
+ }
+ case 1:
+ var d map[string]int
+ err := bencode.Unmarshal(msg.ExtendedPayload, &d)
+ if err != nil {
+ err = fmt.Errorf("error unmarshalling extended payload: %s", err)
+ break
+ }
+ if d["msg_type"] != 1 {
+ break
+ }
+ piece := d["piece"]
+ log.Println(piece, d["total_size"], len(msg.ExtendedPayload))
+ copy(t.MetaData[(1<<14)*piece:], msg.ExtendedPayload[len(msg.ExtendedPayload)-metadataPieceSize(d["total_size"], piece):])
+ t.MetaDataHave[piece] = true
+ if !t.GotAllMetadataPieces() {
+ break
+ }
+ log.Printf("%q", t.MetaData)
+ h := sha1.New()
+ h.Write(t.MetaData)
+ var ih InfoHash
+ copy(ih[:], h.Sum(nil)[:])
+ if ih != t.InfoHash {
+ panic(ih)
+ }
+ }
default:
err = fmt.Errorf("received unknown message type: %#v", msg.Type)
}
return nil
}
-// Prepare a Torrent without any attachment to a Client. That means we can
-// initialize fields all fields that don't require the Client without locking
-// it.
-func newTorrent(metaInfo *metainfo.MetaInfo, dataDir string) (t *torrent, err error) {
- t = &torrent{
- InfoHash: BytesInfoHash(metaInfo.InfoHash),
- MetaInfo: metaInfo,
- }
- t.Data, err = mmapTorrentData(metaInfo, dataDir)
+func (cl *Client) setMetaData(t *torrent, md MetaData) (err error) {
+ t.Data, err = mmapTorrentData(md, cl.DataDir)
if err != nil {
return
}
- for offset := 0; offset < len(metaInfo.Pieces); offset += pieceHash.Size() {
- hash := metaInfo.Pieces[offset : offset+pieceHash.Size()]
- if len(hash) != pieceHash.Size() {
- err = errors.New("bad piece hash in metainfo")
- return
- }
+ for _, hash := range md.PieceHashes() {
piece := &piece{}
- copyHashSum(piece.Hash[:], hash)
+ copyHashSum(piece.Hash[:], []byte(hash))
t.Pieces = append(t.Pieces, piece)
t.pendAllChunkSpecs(pp.Integer(len(t.Pieces) - 1))
}
- t.Trackers = make([][]tracker.Client, len(metaInfo.AnnounceList))
- for tierIndex := range metaInfo.AnnounceList {
+ t.Priorities = list.New()
+
+ // Queue all pieces for hashing. This is done sequentially to avoid
+ // spamming goroutines.
+ for _, p := range t.Pieces {
+ p.QueuedForHash = true
+ }
+ go func() {
+ for i := range t.Pieces {
+ cl.verifyPiece(t, pp.Integer(i))
+ }
+ }()
+
+ cl.DownloadStrategy.TorrentStarted(t)
+ return
+}
+
+// Prepare a Torrent without any attachment to a Client. That means we can
+// initialize fields all fields that don't require the Client without locking
+// it.
+func newTorrent(ih InfoHash, announceList [][]string) (t *torrent, err error) {
+ t = &torrent{
+ InfoHash: ih,
+ }
+ t.Trackers = make([][]tracker.Client, len(announceList))
+ for tierIndex := range announceList {
tier := t.Trackers[tierIndex]
- for _, url := range metaInfo.AnnounceList[tierIndex] {
+ for _, url := range announceList[tierIndex] {
tr, err := tracker.New(url)
if err != nil {
log.Print(err)
return
}
-// Adds the torrent to the client.
-func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) error {
- torrent, err := newTorrent(metaInfo, me.DataDir)
+func (cl *Client) AddMagnet(uri string) (err error) {
+ m, err := ParseMagnetURI(uri)
if err != nil {
- return err
+ return
}
- me.mu.Lock()
- defer me.mu.Unlock()
- if _, ok := me.torrents[torrent.InfoHash]; ok {
- return torrent.Close()
+ t, err := newTorrent(m.InfoHash, [][]string{m.Trackers})
+ if err != nil {
+ return
}
- me.torrents[torrent.InfoHash] = torrent
- me.DownloadStrategy.TorrentStarted(torrent)
- if !me.DisableTrackers {
- go me.announceTorrent(torrent)
+ t.DisplayName = m.DisplayName
+ cl.mu.Lock()
+ defer cl.mu.Unlock()
+ err = cl.addTorrent(t)
+ if err != nil {
+ t.Close()
}
- torrent.Priorities = list.New()
+ return
+}
- // Queue all pieces for hashing. This is done sequentially to avoid
- // spamming goroutines.
- for _, p := range torrent.Pieces {
- p.QueuedForHash = true
+func (me *Client) addTorrent(t *torrent) (err error) {
+ if _, ok := me.torrents[t.InfoHash]; ok {
+ err = fmt.Errorf("torrent infohash collision")
+ return
}
- go func() {
- for i := range torrent.Pieces {
- me.verifyPiece(torrent, pp.Integer(i))
- }
- }()
+ me.torrents[t.InfoHash] = t
+ if !me.DisableTrackers {
+ go me.announceTorrent(t)
+ }
+ return
+}
- return nil
+// Adds the torrent to the client.
+func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) (err error) {
+ t, err := newTorrent(BytesInfoHash(metaInfo.InfoHash), metaInfo.AnnounceList)
+ if err != nil {
+ return
+ }
+ err = me.addTorrent(t)
+ if err != nil {
+ return
+ }
+ err = me.setMetaData(t, metaInfoMetaData{metaInfo})
+ if err != nil {
+ return
+ }
+ return
}
func (cl *Client) listenerAnnouncePort() (port int16) {
// Do we actually want this chunk?
if _, ok := t.Pieces[req.Index].PendingChunkSpecs[req.chunkSpec]; !ok {
+ log.Printf("got unnecessary chunk from %v: %q", req, string(c.PeerId[:]))
return nil
}
"container/list"
"fmt"
"io"
+ "log"
"net"
"sort"
"bitbucket.org/anacrolix/go.torrent/mmap_span"
pp "bitbucket.org/anacrolix/go.torrent/peer_protocol"
"bitbucket.org/anacrolix/go.torrent/tracker"
- metainfo "github.com/nsf/libtorgo/torrent"
)
func (t *torrent) PieceNumPendingBytes(index pp.Integer) (count pp.Integer) {
InfoHash InfoHash
Pieces []*piece
Data mmap_span.MMapSpan
- MetaInfo *metainfo.MetaInfo
+ Info MetaData
Conns []*connection
Peers []Peer
Priorities *list.List
// mirror their respective URLs from the announce-list key.
Trackers [][]tracker.Client
lastReadPiece int
+ DisplayName string
+ MetaData []byte
+ MetaDataHave []bool
+}
+
+func (t *torrent) GotAllMetadataPieces() bool {
+ if t.MetaDataHave == nil {
+ return false
+ }
+ for _, have := range t.MetaDataHave {
+ if !have {
+ return false
+ }
+ }
+ return true
+}
+
+func (t *torrent) SetMetaDataSize(bytes int64) {
+ if t.MetaData != nil {
+ if len(t.MetaData) != int(bytes) {
+ log.Printf("new metadata_size differs")
+ }
+ return
+ }
+ t.MetaData = make([]byte, bytes)
+ t.MetaDataHave = make([]bool, (bytes+(1<<14)-1)/(1<<14))
+}
+
+func (t *torrent) Name() string {
+ if t.Info == nil {
+ return t.DisplayName
+ }
+ return t.Info.Name()
}
func (t *torrent) pieceStatusChar(index int) byte {
}
func (t *torrent) String() string {
- return t.MetaInfo.Name
+ return t.Name()
+}
+
+func (t *torrent) haveInfo() bool {
+ return t.Info != nil
}
func (t *torrent) BytesLeft() (left int64) {
+ if !t.haveInfo() {
+ return -1
+ }
for i := pp.Integer(0); i < pp.Integer(t.NumPieces()); i++ {
left += int64(t.PieceNumPendingBytes(i))
}
}
func (t *torrent) UsualPieceSize() int {
- return int(t.MetaInfo.PieceLength)
+ return int(t.Info.PieceLength())
}
func (t *torrent) LastPieceSize() int {
}
func (t *torrent) NumPieces() int {
- return len(t.MetaInfo.Pieces) / pieceHash.Size()
+ return t.Info.PieceCount()
}
func (t *torrent) NumPiecesCompleted() (num int) {
}
func (t *torrent) requestOffset(r request) int64 {
- return torrentRequestOffset(t.Length(), t.MetaInfo.PieceLength, r)
+ return torrentRequestOffset(t.Length(), t.Info.PieceLength(), r)
}
// Return the request that would include the given offset into the torrent data.
func (t *torrent) offsetRequest(off int64) (req request, ok bool) {
- return torrentOffsetRequest(t.Length(), t.MetaInfo.PieceLength, chunkSize, off)
+ return torrentOffsetRequest(t.Length(), t.Info.PieceLength(), chunkSize, off)
}
func (t *torrent) WriteChunk(piece int, begin int64, data []byte) (err error) {
- _, err = t.Data.WriteAt(data, int64(piece)*t.MetaInfo.PieceLength+begin)
+ _, err = t.Data.WriteAt(data, int64(piece)*t.Info.PieceLength()+begin)
return
}
if piece.PendingChunkSpecs == nil {
piece.PendingChunkSpecs = make(
map[chunkSpec]struct{},
- (t.MetaInfo.PieceLength+chunkSize-1)/chunkSize)
+ (t.Info.PieceLength()+chunkSize-1)/chunkSize)
}
c := chunkSpec{
Begin: 0,
func (t *torrent) PieceLength(piece pp.Integer) (len_ pp.Integer) {
if int(piece) == t.NumPieces()-1 {
- len_ = pp.Integer(t.Data.Size() % t.MetaInfo.PieceLength)
+ len_ = pp.Integer(t.Data.Size() % t.Info.PieceLength())
}
if len_ == 0 {
- len_ = pp.Integer(t.MetaInfo.PieceLength)
+ len_ = pp.Integer(t.Info.PieceLength())
}
return
}
func (t *torrent) HashPiece(piece pp.Integer) (ps pieceSum) {
hash := pieceHash.New()
- n, err := t.Data.WriteSectionTo(hash, int64(piece)*t.MetaInfo.PieceLength, t.MetaInfo.PieceLength)
+ n, err := t.Data.WriteSectionTo(hash, int64(piece)*t.Info.PieceLength(), t.Info.PieceLength())
if err != nil {
panic(err)
}
return
}
func (t *torrent) haveAllPieces() bool {
+ if t.Info == nil {
+ return false
+ }
for _, piece := range t.Pieces {
if !piece.Complete() {
return false
}
func (t *torrent) wantPiece(index int) bool {
+ if !t.haveInfo() {
+ return false
+ }
p := t.Pieces[index]
return p.EverHashed && len(p.PendingChunkSpecs) != 0
}