]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Make use of trackers
authorMatt Joiner <anacrolix@gmail.com>
Sun, 16 Mar 2014 15:30:10 +0000 (02:30 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Sun, 16 Mar 2014 15:30:10 +0000 (02:30 +1100)
client.go
cmd/torrent/main.go
tracker/udp/udp_tracker_test.go

index 2b0b6d155c25dbc644a0e0645ef05e008a225ed3..f874daba7e752c44b2151a509acea0da33b5fd80 100644 (file)
--- a/client.go
+++ b/client.go
@@ -2,6 +2,8 @@ package torrent
 
 import (
        "bitbucket.org/anacrolix/go.torrent/peer_protocol"
+       "bitbucket.org/anacrolix/go.torrent/tracker"
+       _ "bitbucket.org/anacrolix/go.torrent/tracker/udp"
        "bufio"
        "container/list"
        "crypto"
@@ -13,6 +15,7 @@ import (
        "io"
        "launchpad.net/gommap"
        "log"
+       mathRand "math/rand"
        "net"
        "os"
        "path/filepath"
@@ -222,6 +225,13 @@ type Torrent struct {
        Conns      []*Connection
        Peers      []Peer
        Priorities *list.List
+       // BEP 12 Multitracker Metadata Extension. The tracker.Client instances
+       // mirror their respective URLs from the announce-list key.
+       Trackers [][]tracker.Client
+}
+
+func (t *Torrent) Length() int64 {
+       return int64(t.PieceLength(peer_protocol.Integer(len(t.Pieces)-1))) + int64(len(t.Pieces)-1)*int64(t.PieceLength(0))
 }
 
 func (t *Torrent) Close() (err error) {
@@ -261,6 +271,7 @@ func (t *Torrent) piecesByPendingBytesDesc() (indices []peer_protocol.Integer) {
        return slice.Indices
 }
 
+// Currently doesn't really queue, but should in the future.
 func (cl *Client) queuePieceCheck(t *Torrent, pieceIndex peer_protocol.Integer) {
        piece := t.Pieces[pieceIndex]
        if piece.Hashing {
@@ -346,9 +357,8 @@ func (t *Torrent) requestHeat() (ret map[Request]int) {
 }
 
 type Peer struct {
-       Id   [20]byte
-       IP   net.IP
-       Port int
+       Id [20]byte
+       tracker.Peer
 }
 
 func (t *Torrent) PieceLength(piece peer_protocol.Integer) (len_ peer_protocol.Integer) {
@@ -536,6 +546,15 @@ func (me *Client) initiateConn(peer Peer, torrent *Torrent) {
        }()
 }
 
+func (t *Torrent) haveAllPieces() bool {
+       for _, piece := range t.Pieces {
+               if !piece.Complete() {
+                       return false
+               }
+       }
+       return true
+}
+
 func (me *Torrent) haveAnyPieces() bool {
        for _, piece := range me.Pieces {
                if piece.Complete() {
@@ -594,7 +613,10 @@ func (me *Client) runConnection(sock net.Conn, torrent *Torrent, peerId [20]byte
                conn.post <- peer_protocol.Bytes(me.PeerId[:])
        }
        me.mu.Lock()
-       me.addConnection(torrent, conn)
+       defer me.mu.Unlock()
+       if !me.addConnection(torrent, conn) {
+               return
+       }
        if torrent.haveAnyPieces() {
                conn.Post(peer_protocol.Message{
                        Type:     peer_protocol.Bitfield,
@@ -606,7 +628,6 @@ func (me *Client) runConnection(sock net.Conn, torrent *Torrent, peerId [20]byte
                err = fmt.Errorf("during Connection loop: %s", err)
        }
        me.dropConnection(torrent, conn)
-       me.mu.Unlock()
        return
 }
 
@@ -714,8 +735,9 @@ func (me *Client) dropConnection(torrent *Torrent, conn *Connection) {
 }
 
 func (me *Client) addConnection(t *Torrent, c *Connection) bool {
-       for _, c := range t.Conns {
-               if c.PeerId == c.PeerId {
+       for _, c0 := range t.Conns {
+               if c.PeerId == c0.PeerId {
+                       log.Printf("%s and %s have the same ID: %s", c.Socket.RemoteAddr(), c0.Socket.RemoteAddr(), c.PeerId)
                        return false
                }
        }
@@ -748,22 +770,53 @@ func (me *Client) AddPeers(infoHash InfoHash, peers []Peer) error {
        return nil
 }
 
-func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) error {
-       torrent := &Torrent{
+// Prepare a Torrent without any attachment to a Client. That means we can
+// initialize fields all fields that don't require the Client without locking
+// it.
+func newTorrent(metaInfo *metainfo.MetaInfo, dataDir string) (torrent *Torrent, err error) {
+       torrent = &Torrent{
                InfoHash: BytesInfoHash(metaInfo.InfoHash),
                MetaInfo: metaInfo,
        }
        for offset := 0; offset < len(metaInfo.Pieces); offset += PieceHash.Size() {
                hash := metaInfo.Pieces[offset : offset+PieceHash.Size()]
                if len(hash) != PieceHash.Size() {
-                       return errors.New("bad piece hash in metainfo")
+                       err = errors.New("bad piece hash in metainfo")
+                       return
                }
                piece := &piece{}
                copyHashSum(piece.Hash[:], hash)
                torrent.Pieces = append(torrent.Pieces, piece)
        }
-       var err error
-       torrent.Data, err = mmapTorrentData(metaInfo, me.DataDir)
+       torrent.Data, err = mmapTorrentData(metaInfo, dataDir)
+       if err != nil {
+               return
+       }
+       torrent.Trackers = make([][]tracker.Client, len(metaInfo.AnnounceList))
+       for tierIndex := range metaInfo.AnnounceList {
+               tier := torrent.Trackers[tierIndex]
+               for _, url := range metaInfo.AnnounceList[tierIndex] {
+                       tr, err := tracker.New(url)
+                       if err != nil {
+                               log.Print(err)
+                               continue
+                       }
+                       tier = append(tier, tr)
+               }
+               // The trackers within each tier must be shuffled before use.
+               // http://stackoverflow.com/a/12267471/149482
+               // http://www.bittorrent.org/beps/bep_0012.html#order-of-processing
+               for i := range tier {
+                       j := mathRand.Intn(i + 1)
+                       tier[i], tier[j] = tier[j], tier[i]
+               }
+               torrent.Trackers[tierIndex] = tier
+       }
+       return
+}
+
+func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) error {
+       torrent, err := newTorrent(metaInfo, me.DataDir)
        if err != nil {
                return err
        }
@@ -773,12 +826,66 @@ func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) error {
                return torrent.Close()
        }
        me.torrents[torrent.InfoHash] = torrent
+       go me.announceTorrent(torrent)
+       go me.PrioritizeDataRegion(torrent.InfoHash, 0, torrent.Length())
+       // for i := range torrent.Pieces {
+       //      me.queuePieceCheck(torrent, peer_protocol.Integer(i))
+       // }
        return nil
 }
 
+func (cl *Client) announceTorrent(t *Torrent) {
+       req := tracker.AnnounceRequest{
+               Event:   tracker.Started,
+               NumWant: -1,
+       }
+       req.PeerId = cl.PeerId
+       req.InfoHash = t.InfoHash
+newAnnounce:
+       for {
+               for _, tier := range t.Trackers {
+                       for trIndex, tr := range tier {
+                               if err := tr.Connect(); err != nil {
+                                       log.Print(err)
+                                       continue
+                               }
+                               resp, err := tr.Announce(&req)
+                               if err != nil {
+                                       log.Print(err)
+                                       continue
+                               }
+                               var peers []Peer
+                               for _, peer := range resp.Peers {
+                                       peers = append(peers, Peer{
+                                               Peer: peer,
+                                       })
+                               }
+                               if err := cl.AddPeers(t.InfoHash, peers); err != nil {
+                                       log.Print(err)
+                                       return
+                               }
+                               log.Printf("%d new peers from %s", len(peers), "TODO")
+                               tier[0], tier[trIndex] = tier[trIndex], tier[0]
+                               time.Sleep(time.Second * time.Duration(resp.Interval))
+                               continue newAnnounce
+                       }
+               }
+               time.Sleep(time.Second)
+       }
+}
+
+func (cl *Client) allTorrentsCompleted() bool {
+       for _, t := range cl.torrents {
+               if !t.haveAllPieces() {
+                       return false
+               }
+       }
+       return true
+}
+
 func (me *Client) WaitAll() {
        me.mu.Lock()
-       for len(me.torrents) != 0 {
+       for !me.allTorrentsCompleted() {
                me.event.Wait()
        }
        me.mu.Unlock()
@@ -900,6 +1007,7 @@ func (me *Client) pieceHashed(t *Torrent, piece peer_protocol.Integer, correct b
                        }
                }
        }
+       me.event.Broadcast()
 }
 
 func (cl *Client) verifyPiece(t *Torrent, index peer_protocol.Integer) {
index f333d7204a48108f11503d85f4bfc11a5b32cc53..6f53f6c0ec419f94413436cabdb8e96cfb416820 100644 (file)
@@ -2,6 +2,7 @@ package main
 
 import (
        "bitbucket.org/anacrolix/go.torrent"
+       "bitbucket.org/anacrolix/go.torrent/tracker"
        "flag"
        "fmt"
        metainfo "github.com/nsf/libtorgo/torrent"
@@ -29,6 +30,7 @@ func main() {
        }
        client := torrent.Client{
                DataDir: *downloadDir,
+               // HalfOpenLimit: 2,
        }
        client.Start()
        defer client.Stop()
@@ -54,9 +56,10 @@ func main() {
                                log.Fatal(err)
                        }
                        return []torrent.Peer{{
-                               IP:   addr.IP,
-                               Port: addr.Port,
-                       }}
+                               Peer: tracker.Peer{
+                                       IP:   addr.IP,
+                                       Port: addr.Port,
+                               }}}
                }())
                if err != nil {
                        log.Fatal(err)
index 4f524fd5fd279eb02aa5bbb60ea39a2ddc8f2a64..51b6dad7a97d773e8e2a737017b11840d2de7df4 100644 (file)
@@ -5,13 +5,15 @@ import (
        "bytes"
        "crypto/rand"
        "encoding/binary"
-       "encoding/hex"
        "io"
        "net"
+       "sync"
        "syscall"
        "testing"
 )
 
+// Ensure net.IPs are stored big-endian, to match the way they're read from
+// the wire.
 func TestNetIPv4Bytes(t *testing.T) {
        ip := net.IP([]byte{127, 0, 0, 1})
        if ip.String() != "127.0.0.1" {
@@ -95,16 +97,55 @@ func TestUDPTracker(t *testing.T) {
                Event:   tracker.Started,
        }
        rand.Read(req.PeerId[:])
-       n, err := hex.Decode(req.InfoHash[:], []byte("c833bb2b5e7bcb9c07f4c020b4be430c28ba7cdb"))
-       if err != nil {
-               t.Fatal(err)
-       }
-       if n != len(req.InfoHash) {
-               panic("nope")
-       }
+       copy(req.InfoHash[:], []uint8{0xa3, 0x56, 0x41, 0x43, 0x74, 0x23, 0xe6, 0x26, 0xd9, 0x38, 0x25, 0x4a, 0x6b, 0x80, 0x49, 0x10, 0xa6, 0x67, 0xa, 0xc1})
+       // TODO: Find out what torrent this info hash corresponds to.
+       // n, err := hex.Decode(req.InfoHash[:], []byte("c833bb2b5e7bcb9c07f4c020b4be430c28ba7cdb"))
+       // if err != nil {
+       //      t.Fatal(err)
+       // }
+       // if n != len(req.InfoHash) {
+       //      panic("nope")
+       // }
        resp, err := tr.Announce(&req)
        if err != nil {
                t.Fatal(err)
        }
        t.Log(resp)
 }
+
+func TestAnnounceRandomInfoHash(t *testing.T) {
+       wg := sync.WaitGroup{}
+       for _, url := range []string{
+               "udp://tracker.openbittorrent.com:80/announce",
+               "udp://tracker.publicbt.com:80",
+               "udp://tracker.istole.it:6969",
+               "udp://tracker.ccc.de:80",
+               "udp://tracker.open.demonii.com:1337",
+       } {
+               go func(url string) {
+                       defer wg.Done()
+                       tr, err := tracker.New(url)
+                       if err != nil {
+                               t.Fatal(err)
+                       }
+                       if err := tr.Connect(); err != nil {
+                               t.Log(err)
+                               return
+                       }
+                       req := tracker.AnnounceRequest{
+                               Event: tracker.Stopped,
+                       }
+                       rand.Read(req.PeerId[:])
+                       rand.Read(req.InfoHash[:])
+                       resp, err := tr.Announce(&req)
+                       if err != nil {
+                               t.Fatal(err)
+                       }
+                       if resp.Leechers != 0 || resp.Seeders != 0 || len(resp.Peers) != 0 {
+                               t.Fatal(resp)
+                       }
+               }(url)
+               wg.Add(1)
+       }
+       wg.Wait()
+}