package torrent
import (
+ "bitbucket.org/anacrolix/go.torrent/peer_protocol"
"crypto"
+ "crypto/rand"
"errors"
metainfo "github.com/nsf/libtorgo/torrent"
"io"
"launchpad.net/gommap"
+ "log"
+ "net"
"os"
"path/filepath"
)
PieceHash = crypto.SHA1
)
-type infoHash [20]byte
+type InfoHash [20]byte
type pieceSum [20]byte
}
}
-func BytesInfoHash(b []byte) (ih infoHash) {
+func BytesInfoHash(b []byte) (ih InfoHash) {
if len(b) != len(ih) {
panic("bad infohash bytes")
}
Hash pieceSum
}
+type connection struct {
+ Socket net.Conn
+ post chan peer_protocol.Message
+
+ Interested bool
+ Choked bool
+ Requests []peer_protocol.Request
+
+ PeerId [20]byte
+ PeerInterested bool
+ PeerChoked bool
+ PeerRequests []peer_protocol.Request
+}
+
type torrent struct {
- InfoHash infoHash
+ InfoHash InfoHash
Pieces []piece
Data MMapSpan
MetaInfo *metainfo.MetaInfo
+ Conns []connection
+ Peers []Peer
+}
+
+type Peer struct {
+ Id [20]byte
+ IP net.IP
+ Port int
}
func (t torrent) PieceSize(piece int) (size int64) {
}
type client struct {
- DataDir string
+ DataDir string
+ HalfOpenLimit int
+ PeerId [20]byte
+
+ halfOpen int
+ torrents map[InfoHash]*torrent
noTorrents chan struct{}
addTorrent chan *torrent
- torrents map[infoHash]*torrent
- torrentFinished chan infoHash
+ torrentFinished chan InfoHash
actorTask chan func()
}
func NewClient(dataDir string) *client {
c := &client{
- DataDir: dataDir,
+ DataDir: dataDir,
+ HalfOpenLimit: 10,
+
+ torrents: make(map[InfoHash]*torrent),
noTorrents: make(chan struct{}),
addTorrent: make(chan *torrent),
- torrents: make(map[infoHash]*torrent),
- torrentFinished: make(chan infoHash),
+ torrentFinished: make(chan InfoHash),
actorTask: make(chan func()),
}
+ _, err := rand.Read(c.PeerId[:])
+ if err != nil {
+ panic("error generating peer id")
+ }
go c.run()
return c
}
return
}
+func (me *client) torrent(ih InfoHash) *torrent {
+ for _, t := range me.torrents {
+ if t.InfoHash == ih {
+ return t
+ }
+ }
+ return nil
+}
+
+func (me *client) initiateConn(peer Peer, torrent *torrent) {
+ if peer.Id == me.PeerId {
+ return
+ }
+ me.halfOpen++
+ go func() {
+ conn, err := net.DialTCP("tcp", nil, &net.TCPAddr{
+ IP: peer.IP,
+ Port: peer.Port,
+ })
+ me.withContext(func() {
+ me.halfOpen--
+ me.openNewConns()
+ })
+ if err != nil {
+ log.Printf("error connecting to peer: %s", err)
+ return
+ }
+ me.runConnection(conn, torrent, peer.Id)
+ }()
+}
+
+func (me *client) runConnection(conn net.Conn, torrent *torrent, peerId [20]byte) {
+ log.Fatalf("connected to %s", conn)
+}
+
+func (me *client) openNewConns() {
+ for _, t := range me.torrents {
+ for len(t.Peers) != 0 {
+ if me.halfOpen >= me.HalfOpenLimit {
+ return
+ }
+ p := t.Peers[0]
+ t.Peers = t.Peers[1:]
+ me.initiateConn(p, t)
+ }
+ }
+}
+
+func (me *client) AddPeers(infoHash InfoHash, peers []Peer) (err error) {
+ me.withContext(func() {
+ t := me.torrent(infoHash)
+ if t == nil {
+ err = errors.New("no such torrent")
+ return
+ }
+ t.Peers = append(t.Peers, peers...)
+ me.openNewConns()
+ })
+ return
+}
+
func (me *client) AddTorrent(metaInfo *metainfo.MetaInfo) error {
torrent := &torrent{
InfoHash: BytesInfoHash(metaInfo.InfoHash),
me.actorTask <- f
}
-func (me *client) pieceHashed(ih infoHash, piece int, correct bool) {
+func (me *client) pieceHashed(ih InfoHash, piece int, correct bool) {
torrent := me.torrents[ih]
torrent.Pieces[piece].State = func() pieceState {
if correct {
+++ /dev/null
-package protocol
-
-import (
- "bytes"
- "encoding/binary"
- "errors"
- // "errors"
- "fmt"
- "io"
- "io/ioutil"
- // "os"
-)
-
-type (
- MessageType byte
- Integer uint32
- PieceIndex Integer
- PieceOffset Integer
-)
-
-const (
- Choke MessageType = iota
- Unchoke
- Interested
- NotInterested
- Have
- Bitfield
- Request
- Piece
- Cancel
-)
-
-type Message struct {
- KeepAlive bool
- Type MessageType
- Bitfield []bool
- Index PieceIndex
- Begin PieceOffset
- Length Integer
- Piece []byte
-}
-
-func (msg *Message) UnmarshalReader(r io.Reader) (err error) {
- err = binary.Read(r, binary.BigEndian, &msg.Type)
- switch err {
- case nil:
- msg.KeepAlive = false
- case io.EOF:
- msg.KeepAlive = true
- err = nil
- return
- default:
- return
- }
- switch msg.Type {
- case Choke, Unchoke, Interested, NotInterested:
- case Have:
- err = binary.Read(r, binary.BigEndian, &msg.Index)
- case Request, Cancel:
- err = binary.Read(r, binary.BigEndian, &msg.Index)
- if err != nil {
- return
- }
- err = binary.Read(r, binary.BigEndian, &msg.Begin)
- if err != nil {
- return
- }
- err = binary.Read(r, binary.BigEndian, &msg.Length)
- case Bitfield:
- // var bf []byte
- _, err = ioutil.ReadAll(r)
- if err != nil {
- return
- }
- case Piece:
- default:
- return fmt.Errorf("unknown type: %v", msg.Type)
- }
- return
-}
-
-func (msg *Message) MarshalBinary() (b []byte, err error) {
- w := &bytes.Buffer{}
- if msg.KeepAlive {
- b = w.Bytes()
- return
- }
- err = w.WriteByte(byte(msg.Type))
- if err != nil {
- return
- }
- switch msg.Type {
- case Choke, Unchoke, Interested, NotInterested:
- case Have:
- err = binary.Write(w, binary.BigEndian, msg.Index)
- case Request, Cancel:
- err = binary.Write(w, binary.BigEndian, msg.Index)
- if err != nil {
- return
- }
- err = binary.Write(w, binary.BigEndian, msg.Begin)
- if err != nil {
- return
- }
- err = binary.Write(w, binary.BigEndian, msg.Length)
- case Bitfield, Piece:
- panic("unimplemented")
- default:
- err = errors.New("unknown type")
- return
- }
- if err == nil {
- b = w.Bytes()
- }
- return
-}