From 31530899e41acd78af1582cf2e37e8aabd78e0c1 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Mon, 17 Mar 2014 02:30:10 +1100 Subject: [PATCH] Make use of trackers --- client.go | 134 ++++++++++++++++++++++++++++---- cmd/torrent/main.go | 9 ++- tracker/udp/udp_tracker_test.go | 57 ++++++++++++-- 3 files changed, 176 insertions(+), 24 deletions(-) diff --git a/client.go b/client.go index 2b0b6d15..f874daba 100644 --- a/client.go +++ b/client.go @@ -2,6 +2,8 @@ package torrent import ( "bitbucket.org/anacrolix/go.torrent/peer_protocol" + "bitbucket.org/anacrolix/go.torrent/tracker" + _ "bitbucket.org/anacrolix/go.torrent/tracker/udp" "bufio" "container/list" "crypto" @@ -13,6 +15,7 @@ import ( "io" "launchpad.net/gommap" "log" + mathRand "math/rand" "net" "os" "path/filepath" @@ -222,6 +225,13 @@ type Torrent struct { Conns []*Connection Peers []Peer Priorities *list.List + // BEP 12 Multitracker Metadata Extension. The tracker.Client instances + // mirror their respective URLs from the announce-list key. + Trackers [][]tracker.Client +} + +func (t *Torrent) Length() int64 { + return int64(t.PieceLength(peer_protocol.Integer(len(t.Pieces)-1))) + int64(len(t.Pieces)-1)*int64(t.PieceLength(0)) } func (t *Torrent) Close() (err error) { @@ -261,6 +271,7 @@ func (t *Torrent) piecesByPendingBytesDesc() (indices []peer_protocol.Integer) { return slice.Indices } +// Currently doesn't really queue, but should in the future. func (cl *Client) queuePieceCheck(t *Torrent, pieceIndex peer_protocol.Integer) { piece := t.Pieces[pieceIndex] if piece.Hashing { @@ -346,9 +357,8 @@ func (t *Torrent) requestHeat() (ret map[Request]int) { } type Peer struct { - Id [20]byte - IP net.IP - Port int + Id [20]byte + tracker.Peer } func (t *Torrent) PieceLength(piece peer_protocol.Integer) (len_ peer_protocol.Integer) { @@ -536,6 +546,15 @@ func (me *Client) initiateConn(peer Peer, torrent *Torrent) { }() } +func (t *Torrent) haveAllPieces() bool { + for _, piece := range t.Pieces { + if !piece.Complete() { + return false + } + } + return true +} + func (me *Torrent) haveAnyPieces() bool { for _, piece := range me.Pieces { if piece.Complete() { @@ -594,7 +613,10 @@ func (me *Client) runConnection(sock net.Conn, torrent *Torrent, peerId [20]byte conn.post <- peer_protocol.Bytes(me.PeerId[:]) } me.mu.Lock() - me.addConnection(torrent, conn) + defer me.mu.Unlock() + if !me.addConnection(torrent, conn) { + return + } if torrent.haveAnyPieces() { conn.Post(peer_protocol.Message{ Type: peer_protocol.Bitfield, @@ -606,7 +628,6 @@ func (me *Client) runConnection(sock net.Conn, torrent *Torrent, peerId [20]byte err = fmt.Errorf("during Connection loop: %s", err) } me.dropConnection(torrent, conn) - me.mu.Unlock() return } @@ -714,8 +735,9 @@ func (me *Client) dropConnection(torrent *Torrent, conn *Connection) { } func (me *Client) addConnection(t *Torrent, c *Connection) bool { - for _, c := range t.Conns { - if c.PeerId == c.PeerId { + for _, c0 := range t.Conns { + if c.PeerId == c0.PeerId { + log.Printf("%s and %s have the same ID: %s", c.Socket.RemoteAddr(), c0.Socket.RemoteAddr(), c.PeerId) return false } } @@ -748,22 +770,53 @@ func (me *Client) AddPeers(infoHash InfoHash, peers []Peer) error { return nil } -func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) error { - torrent := &Torrent{ +// Prepare a Torrent without any attachment to a Client. That means we can +// initialize fields all fields that don't require the Client without locking +// it. +func newTorrent(metaInfo *metainfo.MetaInfo, dataDir string) (torrent *Torrent, err error) { + torrent = &Torrent{ InfoHash: BytesInfoHash(metaInfo.InfoHash), MetaInfo: metaInfo, } for offset := 0; offset < len(metaInfo.Pieces); offset += PieceHash.Size() { hash := metaInfo.Pieces[offset : offset+PieceHash.Size()] if len(hash) != PieceHash.Size() { - return errors.New("bad piece hash in metainfo") + err = errors.New("bad piece hash in metainfo") + return } piece := &piece{} copyHashSum(piece.Hash[:], hash) torrent.Pieces = append(torrent.Pieces, piece) } - var err error - torrent.Data, err = mmapTorrentData(metaInfo, me.DataDir) + torrent.Data, err = mmapTorrentData(metaInfo, dataDir) + if err != nil { + return + } + torrent.Trackers = make([][]tracker.Client, len(metaInfo.AnnounceList)) + for tierIndex := range metaInfo.AnnounceList { + tier := torrent.Trackers[tierIndex] + for _, url := range metaInfo.AnnounceList[tierIndex] { + tr, err := tracker.New(url) + if err != nil { + log.Print(err) + continue + } + tier = append(tier, tr) + } + // The trackers within each tier must be shuffled before use. + // http://stackoverflow.com/a/12267471/149482 + // http://www.bittorrent.org/beps/bep_0012.html#order-of-processing + for i := range tier { + j := mathRand.Intn(i + 1) + tier[i], tier[j] = tier[j], tier[i] + } + torrent.Trackers[tierIndex] = tier + } + return +} + +func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) error { + torrent, err := newTorrent(metaInfo, me.DataDir) if err != nil { return err } @@ -773,12 +826,66 @@ func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) error { return torrent.Close() } me.torrents[torrent.InfoHash] = torrent + go me.announceTorrent(torrent) + go me.PrioritizeDataRegion(torrent.InfoHash, 0, torrent.Length()) + // for i := range torrent.Pieces { + // me.queuePieceCheck(torrent, peer_protocol.Integer(i)) + // } return nil } +func (cl *Client) announceTorrent(t *Torrent) { + req := tracker.AnnounceRequest{ + Event: tracker.Started, + NumWant: -1, + } + req.PeerId = cl.PeerId + req.InfoHash = t.InfoHash +newAnnounce: + for { + for _, tier := range t.Trackers { + for trIndex, tr := range tier { + if err := tr.Connect(); err != nil { + log.Print(err) + continue + } + resp, err := tr.Announce(&req) + if err != nil { + log.Print(err) + continue + } + var peers []Peer + for _, peer := range resp.Peers { + peers = append(peers, Peer{ + Peer: peer, + }) + } + if err := cl.AddPeers(t.InfoHash, peers); err != nil { + log.Print(err) + return + } + log.Printf("%d new peers from %s", len(peers), "TODO") + tier[0], tier[trIndex] = tier[trIndex], tier[0] + time.Sleep(time.Second * time.Duration(resp.Interval)) + continue newAnnounce + } + } + time.Sleep(time.Second) + } +} + +func (cl *Client) allTorrentsCompleted() bool { + for _, t := range cl.torrents { + if !t.haveAllPieces() { + return false + } + } + return true +} + func (me *Client) WaitAll() { me.mu.Lock() - for len(me.torrents) != 0 { + for !me.allTorrentsCompleted() { me.event.Wait() } me.mu.Unlock() @@ -900,6 +1007,7 @@ func (me *Client) pieceHashed(t *Torrent, piece peer_protocol.Integer, correct b } } } + me.event.Broadcast() } func (cl *Client) verifyPiece(t *Torrent, index peer_protocol.Integer) { diff --git a/cmd/torrent/main.go b/cmd/torrent/main.go index f333d720..6f53f6c0 100644 --- a/cmd/torrent/main.go +++ b/cmd/torrent/main.go @@ -2,6 +2,7 @@ package main import ( "bitbucket.org/anacrolix/go.torrent" + "bitbucket.org/anacrolix/go.torrent/tracker" "flag" "fmt" metainfo "github.com/nsf/libtorgo/torrent" @@ -29,6 +30,7 @@ func main() { } client := torrent.Client{ DataDir: *downloadDir, + // HalfOpenLimit: 2, } client.Start() defer client.Stop() @@ -54,9 +56,10 @@ func main() { log.Fatal(err) } return []torrent.Peer{{ - IP: addr.IP, - Port: addr.Port, - }} + Peer: tracker.Peer{ + IP: addr.IP, + Port: addr.Port, + }}} }()) if err != nil { log.Fatal(err) diff --git a/tracker/udp/udp_tracker_test.go b/tracker/udp/udp_tracker_test.go index 4f524fd5..51b6dad7 100644 --- a/tracker/udp/udp_tracker_test.go +++ b/tracker/udp/udp_tracker_test.go @@ -5,13 +5,15 @@ import ( "bytes" "crypto/rand" "encoding/binary" - "encoding/hex" "io" "net" + "sync" "syscall" "testing" ) +// Ensure net.IPs are stored big-endian, to match the way they're read from +// the wire. func TestNetIPv4Bytes(t *testing.T) { ip := net.IP([]byte{127, 0, 0, 1}) if ip.String() != "127.0.0.1" { @@ -95,16 +97,55 @@ func TestUDPTracker(t *testing.T) { Event: tracker.Started, } rand.Read(req.PeerId[:]) - n, err := hex.Decode(req.InfoHash[:], []byte("c833bb2b5e7bcb9c07f4c020b4be430c28ba7cdb")) - if err != nil { - t.Fatal(err) - } - if n != len(req.InfoHash) { - panic("nope") - } + copy(req.InfoHash[:], []uint8{0xa3, 0x56, 0x41, 0x43, 0x74, 0x23, 0xe6, 0x26, 0xd9, 0x38, 0x25, 0x4a, 0x6b, 0x80, 0x49, 0x10, 0xa6, 0x67, 0xa, 0xc1}) + // TODO: Find out what torrent this info hash corresponds to. + // n, err := hex.Decode(req.InfoHash[:], []byte("c833bb2b5e7bcb9c07f4c020b4be430c28ba7cdb")) + // if err != nil { + // t.Fatal(err) + // } + // if n != len(req.InfoHash) { + // panic("nope") + // } resp, err := tr.Announce(&req) if err != nil { t.Fatal(err) } t.Log(resp) } + +func TestAnnounceRandomInfoHash(t *testing.T) { + wg := sync.WaitGroup{} + for _, url := range []string{ + "udp://tracker.openbittorrent.com:80/announce", + "udp://tracker.publicbt.com:80", + "udp://tracker.istole.it:6969", + "udp://tracker.ccc.de:80", + "udp://tracker.open.demonii.com:1337", + } { + go func(url string) { + defer wg.Done() + tr, err := tracker.New(url) + if err != nil { + t.Fatal(err) + } + if err := tr.Connect(); err != nil { + t.Log(err) + return + } + req := tracker.AnnounceRequest{ + Event: tracker.Stopped, + } + rand.Read(req.PeerId[:]) + rand.Read(req.InfoHash[:]) + resp, err := tr.Announce(&req) + if err != nil { + t.Fatal(err) + } + if resp.Leechers != 0 || resp.Seeders != 0 || len(resp.Peers) != 0 { + t.Fatal(resp) + } + }(url) + wg.Add(1) + } + wg.Wait() +} -- 2.48.1