]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Begin implementing the adding of peers, and initiating of connections
authorMatt Joiner <anacrolix@gmail.com>
Sat, 28 Sep 2013 22:11:24 +0000 (08:11 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Sat, 28 Sep 2013 22:11:24 +0000 (08:11 +1000)
client.go
cmd/torrent/main.go
peer_protocol/protocol.go [new file with mode: 0644]
peer_protocol/protocol_test.go [moved from protocol/protocol_test.go with 87% similarity]
protocol/protocol.go [deleted file]

index 54cf853ca7873b39e7a2ee58e8d04d325c341ff0..e5811dccc6a90a6f4cbe787ce65bcaf65bdd0ec1 100644 (file)
--- a/client.go
+++ b/client.go
@@ -1,11 +1,15 @@
 package torrent
 
 import (
+       "bitbucket.org/anacrolix/go.torrent/peer_protocol"
        "crypto"
+       "crypto/rand"
        "errors"
        metainfo "github.com/nsf/libtorgo/torrent"
        "io"
        "launchpad.net/gommap"
+       "log"
+       "net"
        "os"
        "path/filepath"
 )
@@ -14,7 +18,7 @@ const (
        PieceHash = crypto.SHA1
 )
 
-type infoHash [20]byte
+type InfoHash [20]byte
 
 type pieceSum [20]byte
 
@@ -24,7 +28,7 @@ func copyHashSum(dst, src []byte) {
        }
 }
 
-func BytesInfoHash(b []byte) (ih infoHash) {
+func BytesInfoHash(b []byte) (ih InfoHash) {
        if len(b) != len(ih) {
                panic("bad infohash bytes")
        }
@@ -44,11 +48,33 @@ type piece struct {
        Hash  pieceSum
 }
 
+type connection struct {
+       Socket net.Conn
+       post   chan peer_protocol.Message
+
+       Interested bool
+       Choked     bool
+       Requests   []peer_protocol.Request
+
+       PeerId         [20]byte
+       PeerInterested bool
+       PeerChoked     bool
+       PeerRequests   []peer_protocol.Request
+}
+
 type torrent struct {
-       InfoHash infoHash
+       InfoHash InfoHash
        Pieces   []piece
        Data     MMapSpan
        MetaInfo *metainfo.MetaInfo
+       Conns    []connection
+       Peers    []Peer
+}
+
+type Peer struct {
+       Id   [20]byte
+       IP   net.IP
+       Port int
 }
 
 func (t torrent) PieceSize(piece int) (size int64) {
@@ -79,25 +105,35 @@ func (t torrent) HashPiece(piece int) (ps pieceSum) {
 }
 
 type client struct {
-       DataDir string
+       DataDir       string
+       HalfOpenLimit int
+       PeerId        [20]byte
+
+       halfOpen int
+       torrents map[InfoHash]*torrent
 
        noTorrents      chan struct{}
        addTorrent      chan *torrent
-       torrents        map[infoHash]*torrent
-       torrentFinished chan infoHash
+       torrentFinished chan InfoHash
        actorTask       chan func()
 }
 
 func NewClient(dataDir string) *client {
        c := &client{
-               DataDir: dataDir,
+               DataDir:       dataDir,
+               HalfOpenLimit: 10,
+
+               torrents: make(map[InfoHash]*torrent),
 
                noTorrents:      make(chan struct{}),
                addTorrent:      make(chan *torrent),
-               torrents:        make(map[infoHash]*torrent),
-               torrentFinished: make(chan infoHash),
+               torrentFinished: make(chan InfoHash),
                actorTask:       make(chan func()),
        }
+       _, err := rand.Read(c.PeerId[:])
+       if err != nil {
+               panic("error generating peer id")
+       }
        go c.run()
        return c
 }
@@ -144,6 +180,67 @@ func mmapTorrentData(metaInfo *metainfo.MetaInfo, location string) (mms MMapSpan
        return
 }
 
+func (me *client) torrent(ih InfoHash) *torrent {
+       for _, t := range me.torrents {
+               if t.InfoHash == ih {
+                       return t
+               }
+       }
+       return nil
+}
+
+func (me *client) initiateConn(peer Peer, torrent *torrent) {
+       if peer.Id == me.PeerId {
+               return
+       }
+       me.halfOpen++
+       go func() {
+               conn, err := net.DialTCP("tcp", nil, &net.TCPAddr{
+                       IP:   peer.IP,
+                       Port: peer.Port,
+               })
+               me.withContext(func() {
+                       me.halfOpen--
+                       me.openNewConns()
+               })
+               if err != nil {
+                       log.Printf("error connecting to peer: %s", err)
+                       return
+               }
+               me.runConnection(conn, torrent, peer.Id)
+       }()
+}
+
+func (me *client) runConnection(conn net.Conn, torrent *torrent, peerId [20]byte) {
+       log.Fatalf("connected to %s", conn)
+}
+
+func (me *client) openNewConns() {
+       for _, t := range me.torrents {
+               for len(t.Peers) != 0 {
+                       if me.halfOpen >= me.HalfOpenLimit {
+                               return
+                       }
+                       p := t.Peers[0]
+                       t.Peers = t.Peers[1:]
+                       me.initiateConn(p, t)
+               }
+       }
+}
+
+func (me *client) AddPeers(infoHash InfoHash, peers []Peer) (err error) {
+       me.withContext(func() {
+               t := me.torrent(infoHash)
+               if t == nil {
+                       err = errors.New("no such torrent")
+                       return
+               }
+               t.Peers = append(t.Peers, peers...)
+               me.openNewConns()
+       })
+       return
+}
+
 func (me *client) AddTorrent(metaInfo *metainfo.MetaInfo) error {
        torrent := &torrent{
                InfoHash: BytesInfoHash(metaInfo.InfoHash),
@@ -178,7 +275,7 @@ func (me *client) withContext(f func()) {
        me.actorTask <- f
 }
 
-func (me *client) pieceHashed(ih infoHash, piece int, correct bool) {
+func (me *client) pieceHashed(ih InfoHash, piece int, correct bool) {
        torrent := me.torrents[ih]
        torrent.Pieces[piece].State = func() pieceState {
                if correct {
index 372d397cafb89cf0f190e96c981309ddbe676a4b..476e81d7951262f53493195015e2a182f5dc1cde 100644 (file)
@@ -5,6 +5,7 @@ import (
        "flag"
        metainfo "github.com/nsf/libtorgo/torrent"
        "log"
+       "net"
 )
 
 var (
@@ -26,6 +27,13 @@ func main() {
                if err != nil {
                        log.Fatal(err)
                }
+               err = client.AddPeers(torrent.BytesInfoHash(metaInfo.InfoHash), []torrent.Peer{{
+                       IP:   net.IPv4(127, 0, 0, 1),
+                       Port: 63983,
+               }})
+               if err != nil {
+                       log.Fatal(err)
+               }
        }
        client.WaitAll()
        client.Close()
diff --git a/peer_protocol/protocol.go b/peer_protocol/protocol.go
new file mode 100644 (file)
index 0000000..121da9f
--- /dev/null
@@ -0,0 +1,29 @@
+package peer_protocol
+
+type (
+       MessageType byte
+       Integer     uint32
+)
+
+const (
+       Choke MessageType = iota
+       Unchoke
+       Interested
+       NotInterested
+       Have
+       Bitfield
+       RequestType
+       Piece
+       Cancel
+)
+
+type Request struct {
+       Index, Begin, Length Integer
+}
+
+type Message struct {
+       KeepAlive bool
+       Type      MessageType
+       Bitfield  []bool
+       Piece     []byte
+}
similarity index 87%
rename from protocol/protocol_test.go
rename to peer_protocol/protocol_test.go
index 32db2a626ad4d9ed69bd8e45c8da26bb1b7f0cce..6306d52c042df8f1f8e0f2a29f9c61444283e71c 100644 (file)
@@ -1,4 +1,4 @@
-package protocol
+package peer_protocol
 
 import (
        "testing"
diff --git a/protocol/protocol.go b/protocol/protocol.go
deleted file mode 100644 (file)
index c4f2cb1..0000000
+++ /dev/null
@@ -1,116 +0,0 @@
-package protocol
-
-import (
-       "bytes"
-       "encoding/binary"
-       "errors"
-       // "errors"
-       "fmt"
-       "io"
-       "io/ioutil"
-       // "os"
-)
-
-type (
-       MessageType byte
-       Integer     uint32
-       PieceIndex  Integer
-       PieceOffset Integer
-)
-
-const (
-       Choke MessageType = iota
-       Unchoke
-       Interested
-       NotInterested
-       Have
-       Bitfield
-       Request
-       Piece
-       Cancel
-)
-
-type Message struct {
-       KeepAlive bool
-       Type      MessageType
-       Bitfield  []bool
-       Index     PieceIndex
-       Begin     PieceOffset
-       Length    Integer
-       Piece     []byte
-}
-
-func (msg *Message) UnmarshalReader(r io.Reader) (err error) {
-       err = binary.Read(r, binary.BigEndian, &msg.Type)
-       switch err {
-       case nil:
-               msg.KeepAlive = false
-       case io.EOF:
-               msg.KeepAlive = true
-               err = nil
-               return
-       default:
-               return
-       }
-       switch msg.Type {
-       case Choke, Unchoke, Interested, NotInterested:
-       case Have:
-               err = binary.Read(r, binary.BigEndian, &msg.Index)
-       case Request, Cancel:
-               err = binary.Read(r, binary.BigEndian, &msg.Index)
-               if err != nil {
-                       return
-               }
-               err = binary.Read(r, binary.BigEndian, &msg.Begin)
-               if err != nil {
-                       return
-               }
-               err = binary.Read(r, binary.BigEndian, &msg.Length)
-       case Bitfield:
-               // var bf []byte
-               _, err = ioutil.ReadAll(r)
-               if err != nil {
-                       return
-               }
-       case Piece:
-       default:
-               return fmt.Errorf("unknown type: %v", msg.Type)
-       }
-       return
-}
-
-func (msg *Message) MarshalBinary() (b []byte, err error) {
-       w := &bytes.Buffer{}
-       if msg.KeepAlive {
-               b = w.Bytes()
-               return
-       }
-       err = w.WriteByte(byte(msg.Type))
-       if err != nil {
-               return
-       }
-       switch msg.Type {
-       case Choke, Unchoke, Interested, NotInterested:
-       case Have:
-               err = binary.Write(w, binary.BigEndian, msg.Index)
-       case Request, Cancel:
-               err = binary.Write(w, binary.BigEndian, msg.Index)
-               if err != nil {
-                       return
-               }
-               err = binary.Write(w, binary.BigEndian, msg.Begin)
-               if err != nil {
-                       return
-               }
-               err = binary.Write(w, binary.BigEndian, msg.Length)
-       case Bitfield, Piece:
-               panic("unimplemented")
-       default:
-               err = errors.New("unknown type")
-               return
-       }
-       if err == nil {
-               b = w.Bytes()
-       }
-       return
-}