import (
"bitbucket.org/anacrolix/go.torrent/peer_protocol"
+ "bitbucket.org/anacrolix/go.torrent/tracker"
+ _ "bitbucket.org/anacrolix/go.torrent/tracker/udp"
"bufio"
"container/list"
"crypto"
"io"
"launchpad.net/gommap"
"log"
+ mathRand "math/rand"
"net"
"os"
"path/filepath"
Conns []*Connection
Peers []Peer
Priorities *list.List
+ // BEP 12 Multitracker Metadata Extension. The tracker.Client instances
+ // mirror their respective URLs from the announce-list key.
+ Trackers [][]tracker.Client
+}
+
+func (t *Torrent) Length() int64 {
+ return int64(t.PieceLength(peer_protocol.Integer(len(t.Pieces)-1))) + int64(len(t.Pieces)-1)*int64(t.PieceLength(0))
}
func (t *Torrent) Close() (err error) {
return slice.Indices
}
+// Currently doesn't really queue, but should in the future.
func (cl *Client) queuePieceCheck(t *Torrent, pieceIndex peer_protocol.Integer) {
piece := t.Pieces[pieceIndex]
if piece.Hashing {
}
type Peer struct {
- Id [20]byte
- IP net.IP
- Port int
+ Id [20]byte
+ tracker.Peer
}
func (t *Torrent) PieceLength(piece peer_protocol.Integer) (len_ peer_protocol.Integer) {
}()
}
+func (t *Torrent) haveAllPieces() bool {
+ for _, piece := range t.Pieces {
+ if !piece.Complete() {
+ return false
+ }
+ }
+ return true
+}
+
func (me *Torrent) haveAnyPieces() bool {
for _, piece := range me.Pieces {
if piece.Complete() {
conn.post <- peer_protocol.Bytes(me.PeerId[:])
}
me.mu.Lock()
- me.addConnection(torrent, conn)
+ defer me.mu.Unlock()
+ if !me.addConnection(torrent, conn) {
+ return
+ }
if torrent.haveAnyPieces() {
conn.Post(peer_protocol.Message{
Type: peer_protocol.Bitfield,
err = fmt.Errorf("during Connection loop: %s", err)
}
me.dropConnection(torrent, conn)
- me.mu.Unlock()
return
}
}
func (me *Client) addConnection(t *Torrent, c *Connection) bool {
- for _, c := range t.Conns {
- if c.PeerId == c.PeerId {
+ for _, c0 := range t.Conns {
+ if c.PeerId == c0.PeerId {
+ log.Printf("%s and %s have the same ID: %s", c.Socket.RemoteAddr(), c0.Socket.RemoteAddr(), c.PeerId)
return false
}
}
return nil
}
-func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) error {
- torrent := &Torrent{
+// Prepare a Torrent without any attachment to a Client. That means we can
+// initialize fields all fields that don't require the Client without locking
+// it.
+func newTorrent(metaInfo *metainfo.MetaInfo, dataDir string) (torrent *Torrent, err error) {
+ torrent = &Torrent{
InfoHash: BytesInfoHash(metaInfo.InfoHash),
MetaInfo: metaInfo,
}
for offset := 0; offset < len(metaInfo.Pieces); offset += PieceHash.Size() {
hash := metaInfo.Pieces[offset : offset+PieceHash.Size()]
if len(hash) != PieceHash.Size() {
- return errors.New("bad piece hash in metainfo")
+ err = errors.New("bad piece hash in metainfo")
+ return
}
piece := &piece{}
copyHashSum(piece.Hash[:], hash)
torrent.Pieces = append(torrent.Pieces, piece)
}
- var err error
- torrent.Data, err = mmapTorrentData(metaInfo, me.DataDir)
+ torrent.Data, err = mmapTorrentData(metaInfo, dataDir)
+ if err != nil {
+ return
+ }
+ torrent.Trackers = make([][]tracker.Client, len(metaInfo.AnnounceList))
+ for tierIndex := range metaInfo.AnnounceList {
+ tier := torrent.Trackers[tierIndex]
+ for _, url := range metaInfo.AnnounceList[tierIndex] {
+ tr, err := tracker.New(url)
+ if err != nil {
+ log.Print(err)
+ continue
+ }
+ tier = append(tier, tr)
+ }
+ // The trackers within each tier must be shuffled before use.
+ // http://stackoverflow.com/a/12267471/149482
+ // http://www.bittorrent.org/beps/bep_0012.html#order-of-processing
+ for i := range tier {
+ j := mathRand.Intn(i + 1)
+ tier[i], tier[j] = tier[j], tier[i]
+ }
+ torrent.Trackers[tierIndex] = tier
+ }
+ return
+}
+
+func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) error {
+ torrent, err := newTorrent(metaInfo, me.DataDir)
if err != nil {
return err
}
return torrent.Close()
}
me.torrents[torrent.InfoHash] = torrent
+ go me.announceTorrent(torrent)
+ go me.PrioritizeDataRegion(torrent.InfoHash, 0, torrent.Length())
+ // for i := range torrent.Pieces {
+ // me.queuePieceCheck(torrent, peer_protocol.Integer(i))
+ // }
return nil
}
+func (cl *Client) announceTorrent(t *Torrent) {
+ req := tracker.AnnounceRequest{
+ Event: tracker.Started,
+ NumWant: -1,
+ }
+ req.PeerId = cl.PeerId
+ req.InfoHash = t.InfoHash
+newAnnounce:
+ for {
+ for _, tier := range t.Trackers {
+ for trIndex, tr := range tier {
+ if err := tr.Connect(); err != nil {
+ log.Print(err)
+ continue
+ }
+ resp, err := tr.Announce(&req)
+ if err != nil {
+ log.Print(err)
+ continue
+ }
+ var peers []Peer
+ for _, peer := range resp.Peers {
+ peers = append(peers, Peer{
+ Peer: peer,
+ })
+ }
+ if err := cl.AddPeers(t.InfoHash, peers); err != nil {
+ log.Print(err)
+ return
+ }
+ log.Printf("%d new peers from %s", len(peers), "TODO")
+ tier[0], tier[trIndex] = tier[trIndex], tier[0]
+ time.Sleep(time.Second * time.Duration(resp.Interval))
+ continue newAnnounce
+ }
+ }
+ time.Sleep(time.Second)
+ }
+}
+
+func (cl *Client) allTorrentsCompleted() bool {
+ for _, t := range cl.torrents {
+ if !t.haveAllPieces() {
+ return false
+ }
+ }
+ return true
+}
+
func (me *Client) WaitAll() {
me.mu.Lock()
- for len(me.torrents) != 0 {
+ for !me.allTorrentsCompleted() {
me.event.Wait()
}
me.mu.Unlock()
}
}
}
+ me.event.Broadcast()
}
func (cl *Client) verifyPiece(t *Torrent, index peer_protocol.Integer) {
"bytes"
"crypto/rand"
"encoding/binary"
- "encoding/hex"
"io"
"net"
+ "sync"
"syscall"
"testing"
)
+// Ensure net.IPs are stored big-endian, to match the way they're read from
+// the wire.
func TestNetIPv4Bytes(t *testing.T) {
ip := net.IP([]byte{127, 0, 0, 1})
if ip.String() != "127.0.0.1" {
Event: tracker.Started,
}
rand.Read(req.PeerId[:])
- n, err := hex.Decode(req.InfoHash[:], []byte("c833bb2b5e7bcb9c07f4c020b4be430c28ba7cdb"))
- if err != nil {
- t.Fatal(err)
- }
- if n != len(req.InfoHash) {
- panic("nope")
- }
+ copy(req.InfoHash[:], []uint8{0xa3, 0x56, 0x41, 0x43, 0x74, 0x23, 0xe6, 0x26, 0xd9, 0x38, 0x25, 0x4a, 0x6b, 0x80, 0x49, 0x10, 0xa6, 0x67, 0xa, 0xc1})
+ // TODO: Find out what torrent this info hash corresponds to.
+ // n, err := hex.Decode(req.InfoHash[:], []byte("c833bb2b5e7bcb9c07f4c020b4be430c28ba7cdb"))
+ // if err != nil {
+ // t.Fatal(err)
+ // }
+ // if n != len(req.InfoHash) {
+ // panic("nope")
+ // }
resp, err := tr.Announce(&req)
if err != nil {
t.Fatal(err)
}
t.Log(resp)
}
+
+func TestAnnounceRandomInfoHash(t *testing.T) {
+ wg := sync.WaitGroup{}
+ for _, url := range []string{
+ "udp://tracker.openbittorrent.com:80/announce",
+ "udp://tracker.publicbt.com:80",
+ "udp://tracker.istole.it:6969",
+ "udp://tracker.ccc.de:80",
+ "udp://tracker.open.demonii.com:1337",
+ } {
+ go func(url string) {
+ defer wg.Done()
+ tr, err := tracker.New(url)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if err := tr.Connect(); err != nil {
+ t.Log(err)
+ return
+ }
+ req := tracker.AnnounceRequest{
+ Event: tracker.Stopped,
+ }
+ rand.Read(req.PeerId[:])
+ rand.Read(req.InfoHash[:])
+ resp, err := tr.Announce(&req)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if resp.Leechers != 0 || resp.Seeders != 0 || len(resp.Peers) != 0 {
+ t.Fatal(resp)
+ }
+ }(url)
+ wg.Add(1)
+ }
+ wg.Wait()
+}