Basic unchoking and uploading.
Accept incoming connections.
Break out torrentfs lib.
Fix and implement some protocol stuff.
post chan encoding.BinaryMarshaler
write chan []byte
+ // Stuff controlled by the local peer.
Interested bool
Choked bool
Requests map[Request]struct{}
+ // Stuff controlled by the remote peer.
PeerId [20]byte
PeerInterested bool
PeerChoked bool
return true
}
+func (c *Connection) Unchoke() {
+ if !c.Choked {
+ return
+ }
+ c.Post(peer_protocol.Message{
+ Type: peer_protocol.Unchoke,
+ })
+ c.Choked = false
+}
+
func (c *Connection) SetInterested(interested bool) {
if c.Interested == interested {
return
}
func (cl *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) {
+ log.Print(len_)
cl.mu.Lock()
defer cl.mu.Unlock()
t := cl.torrent(ih)
newPriorities := make([]Request, 0, (len_+2*(chunkSize-1))/chunkSize)
- for len_ > 0 {
- index := peer_protocol.Integer(off / int64(t.PieceLength(0)))
- pieceOff := peer_protocol.Integer(off % int64(t.PieceLength(0)))
+ for len_ != 0 {
+ // TODO: Write a function to return the Request for a given offset.
+ index := peer_protocol.Integer(off / t.MetaInfo.PieceLength)
+ pieceOff := peer_protocol.Integer(off % t.MetaInfo.PieceLength)
piece := t.Pieces[index]
if !piece.EverHashed {
cl.queuePieceCheck(t, index)
}
chunk := ChunkSpec{pieceOff / chunkSize * chunkSize, chunkSize}
- adv := int64(chunkSize - pieceOff%chunkSize)
+ if int64(chunk.Length) > len_ {
+ chunk.Length = peer_protocol.Integer(len_)
+ }
+ adv := int64(chunk.Length - pieceOff%chunkSize)
off += adv
len_ -= adv
if _, ok := piece.PendingChunkSpecs[chunk]; !ok && !piece.Hashing {
}
type Peer struct {
- Id [20]byte
- tracker.Peer
+ Id [20]byte
+ IP net.IP
+ Port int
}
func (t *Torrent) PieceLength(piece peer_protocol.Integer) (len_ peer_protocol.Integer) {
HalfOpenLimit int
PeerId [20]byte
DataReady chan DataSpec
+ Listener net.Listener
sync.Mutex
mu *sync.Mutex
err = errors.New("unknown torrent")
return
}
- index := peer_protocol.Integer(off / int64(t.PieceLength(0)))
+ index := peer_protocol.Integer(off / t.MetaInfo.PieceLength)
+ // Reading outside the bounds of a file is an error.
+ if index < 0 {
+ err = os.ErrInvalid
+ return
+ }
+ if int(index) >= len(t.Pieces) {
+ err = io.EOF
+ return
+ }
piece := t.Pieces[index]
if !piece.EverHashed {
cl.queuePieceCheck(t, index)
if err != nil {
panic("error generating peer id")
}
+ if c.Listener != nil {
+ go c.acceptConnections()
+ }
+}
+
+func (cl *Client) acceptConnections() {
+ for {
+ conn, err := cl.Listener.Accept()
+ if err != nil {
+ log.Print(err)
+ return
+ }
+ go func() {
+ if err := cl.runConnection(conn, nil); err != nil {
+ log.Print(err)
+ }
+ }()
+ }
}
func mmapTorrentData(metaInfo *metainfo.MetaInfo, location string) (mms MMapSpan, err error) {
return
}
log.Printf("connected to %s", conn.RemoteAddr())
- err = me.runConnection(conn, torrent, peer.Id)
+ err = me.runConnection(conn, torrent)
if err != nil {
log.Print(err)
}
return false
}
-func (me *Client) runConnection(sock net.Conn, torrent *Torrent, peerId [20]byte) (err error) {
+func (me *Client) runConnection(sock net.Conn, torrent *Torrent) (err error) {
conn := &Connection{
Socket: sock,
Choked: true,
me.peerUnchoked(torrent, conn)
case peer_protocol.Interested:
conn.PeerInterested = true
+ // TODO: This should be done from a dedicated unchoking routine.
+ conn.Unchoke()
case peer_protocol.NotInterested:
conn.PeerInterested = false
case peer_protocol.Have:
me.peerGotPiece(torrent, conn, int(msg.Index))
case peer_protocol.Request:
- conn.PeerRequests[Request{
+ if conn.PeerRequests == nil {
+ conn.PeerRequests = make(map[Request]struct{}, maxRequests)
+ }
+ request := Request{
Index: msg.Index,
ChunkSpec: ChunkSpec{msg.Begin, msg.Length},
- }] = struct{}{}
+ }
+ conn.PeerRequests[request] = struct{}{}
+ // TODO: Requests should be satisfied from a dedicated upload routine.
+ p := make([]byte, msg.Length)
+ n, err := torrent.Data.ReadAt(p, int64(torrent.PieceLength(0))*int64(msg.Index)+int64(msg.Begin))
+ if err != nil {
+ return fmt.Errorf("reading torrent data to serve request %s: %s", request, err)
+ }
+ if n != int(msg.Length) {
+ return fmt.Errorf("bad request: %s", msg)
+ }
+ conn.Post(peer_protocol.Message{
+ Type: peer_protocol.Piece,
+ Index: msg.Index,
+ Begin: msg.Begin,
+ Piece: p,
+ })
case peer_protocol.Bitfield:
if len(msg.Bitfield) < len(torrent.Pieces) {
err = errors.New("received invalid bitfield")
}
me.torrents[torrent.InfoHash] = torrent
go me.announceTorrent(torrent)
- go me.PrioritizeDataRegion(torrent.InfoHash, 0, torrent.Length())
- // for i := range torrent.Pieces {
- // me.queuePieceCheck(torrent, peer_protocol.Integer(i))
- // }
+ for i := range torrent.Pieces {
+ me.queuePieceCheck(torrent, peer_protocol.Integer(i))
+ }
return nil
}
var peers []Peer
for _, peer := range resp.Peers {
peers = append(peers, Peer{
- Peer: peer,
+ IP: peer.IP,
+ Port: peer.Port,
})
}
if err := cl.AddPeers(t.InfoHash, peers); err != nil {
delete(torrent.Pieces[request.Index].PendingChunkSpecs, request.ChunkSpec)
if len(torrent.Pieces[request.Index].PendingChunkSpecs) == 0 {
me.queuePieceCheck(torrent, request.Index)
- return
}
var next *list.Element
for e := torrent.Priorities.Front(); e != nil; e = next {
--- /dev/null
+package torrent
+
+import (
+ "testing"
+)
+
+func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
+ t.SkipNow()
+}
+
+func TestAddTorrentNoUsableURLs(t *testing.T) {
+ t.SkipNow()
+}
+
+func TestAddPeersToUnknownTorrent(t *testing.T) {
+ t.SkipNow()
+}
)
var (
- builtinAnnounceGroups = [][]string{{"udp://tracker.openbittorrent.com:80"}, {"udp://tracker.publicbt.com:80"}, {"udp://tracker.istole.it:6969"}}
+ builtinAnnounceList = [][]string{
+ {"udp://tracker.openbittorrent.com:80"},
+ {"udp://tracker.publicbt.com:80"},
+ {"udp://tracker.istole.it:6969"},
+ }
)
func init() {
log.Print(err)
}
}
- for _, group := range builtinAnnounceGroups {
+ for _, group := range builtinAnnounceList {
b.AddAnnounceGroup(group)
}
batch, err := b.Submit()
mountDir string
)
-const (
- defaultMode = 0555
-)
-
func init() {
flag.StringVar(&downloadDir, "downloadDir", "", "location to save torrent data")
flag.StringVar(&torrentPath, "torrentPath", func() string {
flag.StringVar(&mountDir, "mountDir", "", "location the torrent contents are made available")
}
-type TorrentFS struct {
- Client *torrent.Client
- DataSubs map[chan torrent.DataSpec]struct{}
- sync.Mutex
-}
-
-func (tfs *TorrentFS) publishData() {
- for {
- spec := <-tfs.Client.DataReady
- tfs.Lock()
- for ds := range tfs.DataSubs {
- ds <- spec
- }
- tfs.Unlock()
- }
-}
-
-func (tfs *TorrentFS) SubscribeData() chan torrent.DataSpec {
- ch := make(chan torrent.DataSpec)
- tfs.Lock()
- tfs.DataSubs[ch] = struct{}{}
- tfs.Unlock()
- return ch
-}
-
-func (tfs *TorrentFS) UnsubscribeData(ch chan torrent.DataSpec) {
- go func() {
- for _ = range ch {
- }
- }()
- tfs.Lock()
- delete(tfs.DataSubs, ch)
- tfs.Unlock()
- close(ch)
-}
-
-type rootNode struct {
- fs *TorrentFS
-}
-
-type node struct {
- path []string
- metaInfo *metainfo.MetaInfo
- FS *TorrentFS
- InfoHash torrent.InfoHash
-}
-
-type fileNode struct {
- node
- size uint64
- TorrentOffset int64
-}
-
-func (fn fileNode) Attr() (attr fuse.Attr) {
- attr.Size = fn.size
- attr.Mode = defaultMode
- return
-}
-
-func (fn fileNode) Read(req *fuse.ReadRequest, resp *fuse.ReadResponse, intr fusefs.Intr) fuse.Error {
- if req.Dir {
- panic("hodor")
- }
- dataSpecs := fn.FS.SubscribeData()
- defer fn.FS.UnsubscribeData(dataSpecs)
- data := make([]byte, func() int {
- _len := int64(fn.size) - req.Offset
- if int64(req.Size) < _len {
- return req.Size
- } else {
- // limit read to the end of the file
- return int(_len)
- }
- }())
- infoHash := torrent.BytesInfoHash(fn.metaInfo.InfoHash)
- torrentOff := fn.TorrentOffset + req.Offset
- fn.FS.Client.PrioritizeDataRegion(infoHash, torrentOff, int64(len(data)))
- for {
- n, err := fn.FS.Client.TorrentReadAt(infoHash, torrentOff, data)
- // log.Println(torrentOff, len(data), n, err)
- switch err {
- case nil:
- resp.Data = data[:n]
- return nil
- case torrent.ErrDataNotReady:
- select {
- case <-dataSpecs:
- case <-intr:
- return fuse.EINTR
- }
- default:
- log.Print(err)
- return fuse.EIO
- }
- }
-}
-
-type dirNode struct {
- node
-}
-
-var (
- _ fusefs.HandleReadDirer = dirNode{}
-
- _ fusefs.HandleReader = fileNode{}
-)
-
-func isSubPath(parent, child []string) bool {
- if len(child) <= len(parent) {
- return false
- }
- for i := range parent {
- if parent[i] != child[i] {
- return false
- }
- }
- return true
-}
-
-func (dn dirNode) ReadDir(intr fusefs.Intr) (des []fuse.Dirent, err fuse.Error) {
- names := map[string]bool{}
- for _, fi := range dn.metaInfo.Files {
- if !isSubPath(dn.path, fi.Path) {
- continue
- }
- name := fi.Path[len(dn.path)]
- if names[name] {
- continue
- }
- names[name] = true
- de := fuse.Dirent{
- Name: name,
- }
- if len(fi.Path) == len(dn.path)+1 {
- de.Type = fuse.DT_File
- } else {
- de.Type = fuse.DT_Dir
- }
- des = append(des, de)
- }
- return
-}
-
-func (dn dirNode) Lookup(name string, intr fusefs.Intr) (_node fusefs.Node, err fuse.Error) {
- var torrentOffset int64
- for _, fi := range dn.metaInfo.Files {
- if !isSubPath(dn.path, fi.Path) {
- torrentOffset += fi.Length
- continue
- }
- if fi.Path[len(dn.path)] != name {
- torrentOffset += fi.Length
- continue
- }
- __node := dn.node
- __node.path = append(__node.path, name)
- if len(fi.Path) == len(dn.path)+1 {
- _node = fileNode{
- node: __node,
- size: uint64(fi.Length),
- TorrentOffset: torrentOffset,
- }
- } else {
- _node = dirNode{__node}
- }
- break
- }
- if _node == nil {
- err = fuse.ENOENT
- }
- return
-}
-
-func (dn dirNode) Attr() (attr fuse.Attr) {
- attr.Mode = os.ModeDir | defaultMode
- return
-}
-
-func isSingleFileTorrent(mi *metainfo.MetaInfo) bool {
- return len(mi.Files) == 1 && mi.Files[0].Path == nil
-}
-
-func (me rootNode) Lookup(name string, intr fusefs.Intr) (_node fusefs.Node, err fuse.Error) {
- for _, _torrent := range me.fs.Client.Torrents() {
- metaInfo := _torrent.MetaInfo
- if metaInfo.Name == name {
- __node := node{
- metaInfo: metaInfo,
- FS: me.fs,
- InfoHash: torrent.BytesInfoHash(metaInfo.InfoHash),
- }
- if isSingleFileTorrent(metaInfo) {
- _node = fileNode{__node, uint64(metaInfo.Files[0].Length), 0}
- } else {
- _node = dirNode{__node}
- }
- break
- }
- }
- if _node == nil {
- err = fuse.ENOENT
- }
- return
-}
-
-func (me rootNode) ReadDir(intr fusefs.Intr) (dirents []fuse.Dirent, err fuse.Error) {
- for _, _torrent := range me.fs.Client.Torrents() {
- metaInfo := _torrent.MetaInfo
- dirents = append(dirents, fuse.Dirent{
- Name: metaInfo.Name,
- Type: func() fuse.DirentType {
- if isSingleFileTorrent(metaInfo) {
- return fuse.DT_File
- } else {
- return fuse.DT_Dir
- }
- }(),
- })
- }
- return
-}
-
-func (rootNode) Attr() fuse.Attr {
- return fuse.Attr{
- Mode: os.ModeDir,
- }
-}
-
-func (tfs *TorrentFS) Root() (fusefs.Node, fuse.Error) {
- return rootNode{tfs}, nil
-}
-
func main() {
pprofAddr := flag.String("pprofAddr", "", "pprof HTTP server bind address")
testPeer := flag.String("testPeer", "", "the address for a test peer")
if *pprofAddr != "" {
go http.ListenAndServe(*pprofAddr, nil)
}
- // defer profile.Start(profile.CPUProfile).Stop()
client := &torrent.Client{
DataDir: downloadDir,
- DataReady: make(chan torrent.DataSpec),
HalfOpenLimit: 2,
}
client.Start()
+++ /dev/null
-package main
-
-import (
- "net"
- "testing"
-)
-
-func TestTCPAddrString(t *testing.T) {
- ta := &net.TCPAddr{
- IP: net.IPv4(127, 0, 0, 1),
- Port: 3000,
- }
- s := ta.String()
- l, err := net.Listen("tcp4", "localhost:3000")
- if err != nil {
- t.Fatal(err)
- }
- defer l.Close()
- c, err := net.Dial("tcp", l.Addr().String())
- if err != nil {
- t.Fatal(err)
- }
- defer c.Close()
- ras := c.RemoteAddr().String()
- if ras != s {
- t.FailNow()
- }
-}
--- /dev/null
+package torrentfs
+
+import (
+ "bazil.org/fuse"
+ fusefs "bazil.org/fuse/fs"
+ "bitbucket.org/anacrolix/go.torrent"
+ metainfo "github.com/nsf/libtorgo/torrent"
+ "os"
+ "sync"
+)
+
+const (
+ defaultMode = 0555
+)
+
+type torrentFS struct {
+ Client *torrent.Client
+ DataSubs map[chan torrent.DataSpec]struct{}
+ sync.Mutex
+}
+
+func (tfs *torrentFS) publishData() {
+ for {
+ spec := <-tfs.Client.DataReady
+ tfs.Lock()
+ for ds := range tfs.DataSubs {
+ ds <- spec
+ }
+ tfs.Unlock()
+ }
+}
+
+func (tfs *torrentFS) SubscribeData() chan torrent.DataSpec {
+ ch := make(chan torrent.DataSpec)
+ tfs.Lock()
+ tfs.DataSubs[ch] = struct{}{}
+ tfs.Unlock()
+ return ch
+}
+
+func (tfs *torrentFS) UnsubscribeData(ch chan torrent.DataSpec) {
+ go func() {
+ for _ = range ch {
+ }
+ }()
+ tfs.Lock()
+ delete(tfs.DataSubs, ch)
+ tfs.Unlock()
+ close(ch)
+}
+
+type rootNode struct {
+ fs *torrentFS
+}
+
+type node struct {
+ path []string
+ metaInfo *metainfo.MetaInfo
+ FS *torrentFS
+ InfoHash torrent.InfoHash
+}
+
+type fileNode struct {
+ node
+ size uint64
+ TorrentOffset int64
+}
+
+func (fn fileNode) Attr() (attr fuse.Attr) {
+ attr.Size = fn.size
+ attr.Mode = defaultMode
+ return
+}
+
+func (fn fileNode) Read(req *fuse.ReadRequest, resp *fuse.ReadResponse, intr fusefs.Intr) fuse.Error {
+ if req.Dir {
+ panic("hodor")
+ }
+ dataSpecs := fn.FS.SubscribeData()
+ defer fn.FS.UnsubscribeData(dataSpecs)
+ data := make([]byte, func() int {
+ _len := int64(fn.size) - req.Offset
+ if int64(req.Size) < _len {
+ return req.Size
+ } else {
+ // limit read to the end of the file
+ return int(_len)
+ }
+ }())
+ if len(data) == 0 {
+ return nil
+ }
+ infoHash := torrent.BytesInfoHash(fn.metaInfo.InfoHash)
+ torrentOff := fn.TorrentOffset + req.Offset
+ fn.FS.Client.PrioritizeDataRegion(infoHash, torrentOff, int64(len(data)))
+ for {
+ n, err := fn.FS.Client.TorrentReadAt(infoHash, torrentOff, data)
+ switch err {
+ case nil:
+ resp.Data = data[:n]
+ return nil
+ case torrent.ErrDataNotReady:
+ select {
+ case <-dataSpecs:
+ case <-intr:
+ return fuse.EINTR
+ }
+ default:
+ return fuse.EIO
+ }
+ }
+}
+
+type dirNode struct {
+ node
+}
+
+var (
+ _ fusefs.HandleReadDirer = dirNode{}
+
+ _ fusefs.HandleReader = fileNode{}
+)
+
+func isSubPath(parent, child []string) bool {
+ if len(child) <= len(parent) {
+ return false
+ }
+ for i := range parent {
+ if parent[i] != child[i] {
+ return false
+ }
+ }
+ return true
+}
+
+func (dn dirNode) ReadDir(intr fusefs.Intr) (des []fuse.Dirent, err fuse.Error) {
+ names := map[string]bool{}
+ for _, fi := range dn.metaInfo.Files {
+ if !isSubPath(dn.path, fi.Path) {
+ continue
+ }
+ name := fi.Path[len(dn.path)]
+ if names[name] {
+ continue
+ }
+ names[name] = true
+ de := fuse.Dirent{
+ Name: name,
+ }
+ if len(fi.Path) == len(dn.path)+1 {
+ de.Type = fuse.DT_File
+ } else {
+ de.Type = fuse.DT_Dir
+ }
+ des = append(des, de)
+ }
+ return
+}
+
+func (dn dirNode) Lookup(name string, intr fusefs.Intr) (_node fusefs.Node, err fuse.Error) {
+ var torrentOffset int64
+ for _, fi := range dn.metaInfo.Files {
+ if !isSubPath(dn.path, fi.Path) {
+ torrentOffset += fi.Length
+ continue
+ }
+ if fi.Path[len(dn.path)] != name {
+ torrentOffset += fi.Length
+ continue
+ }
+ __node := dn.node
+ __node.path = append(__node.path, name)
+ if len(fi.Path) == len(dn.path)+1 {
+ _node = fileNode{
+ node: __node,
+ size: uint64(fi.Length),
+ TorrentOffset: torrentOffset,
+ }
+ } else {
+ _node = dirNode{__node}
+ }
+ break
+ }
+ if _node == nil {
+ err = fuse.ENOENT
+ }
+ return
+}
+
+func (dn dirNode) Attr() (attr fuse.Attr) {
+ attr.Mode = os.ModeDir | defaultMode
+ return
+}
+
+func isSingleFileTorrent(mi *metainfo.MetaInfo) bool {
+ return len(mi.Files) == 1 && mi.Files[0].Path == nil
+}
+
+func (me rootNode) Lookup(name string, intr fusefs.Intr) (_node fusefs.Node, err fuse.Error) {
+ for _, _torrent := range me.fs.Client.Torrents() {
+ metaInfo := _torrent.MetaInfo
+ if metaInfo.Name == name {
+ __node := node{
+ metaInfo: metaInfo,
+ FS: me.fs,
+ InfoHash: torrent.BytesInfoHash(metaInfo.InfoHash),
+ }
+ if isSingleFileTorrent(metaInfo) {
+ _node = fileNode{__node, uint64(metaInfo.Files[0].Length), 0}
+ } else {
+ _node = dirNode{__node}
+ }
+ break
+ }
+ }
+ if _node == nil {
+ err = fuse.ENOENT
+ }
+ return
+}
+
+func (me rootNode) ReadDir(intr fusefs.Intr) (dirents []fuse.Dirent, err fuse.Error) {
+ for _, _torrent := range me.fs.Client.Torrents() {
+ metaInfo := _torrent.MetaInfo
+ dirents = append(dirents, fuse.Dirent{
+ Name: metaInfo.Name,
+ Type: func() fuse.DirentType {
+ if isSingleFileTorrent(metaInfo) {
+ return fuse.DT_File
+ } else {
+ return fuse.DT_Dir
+ }
+ }(),
+ })
+ }
+ return
+}
+
+func (rootNode) Attr() fuse.Attr {
+ return fuse.Attr{
+ Mode: os.ModeDir,
+ }
+}
+
+func (tfs *torrentFS) Root() (fusefs.Node, fuse.Error) {
+ return rootNode{tfs}, nil
+}
+
+func MountAndServe(dir string, cl *torrent.Client) error {
+ conn, err := fuse.Mount(dir)
+ if err != nil {
+ return err
+ }
+ fs := &torrentFS{
+ Client: cl,
+ DataSubs: make(map[chan torrent.DataSpec]struct{}),
+ }
+ go fs.publishData()
+ return fusefs.Serve(conn, fs)
+}
--- /dev/null
+package torrentfs
+
+import (
+ "bitbucket.org/anacrolix/go.torrent"
+ "bytes"
+ metainfo "github.com/nsf/libtorgo/torrent"
+ "io"
+ "io/ioutil"
+ "net"
+ "os"
+ "path/filepath"
+ "testing"
+)
+
+func TestTCPAddrString(t *testing.T) {
+ ta := &net.TCPAddr{
+ IP: net.IPv4(127, 0, 0, 1),
+ Port: 3000,
+ }
+ s := ta.String()
+ l, err := net.Listen("tcp4", "localhost:3000")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer l.Close()
+ c, err := net.Dial("tcp", l.Addr().String())
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer c.Close()
+ ras := c.RemoteAddr().String()
+ if ras != s {
+ t.FailNow()
+ }
+}
+
+func createDummyTorrentData(dirName string) string {
+ f, _ := os.Create(filepath.Join(dirName, "greeting"))
+ f.WriteString("hello, world\n")
+ return f.Name()
+}
+
+func createMetaInfo(name string, w io.Writer) {
+ builder := metainfo.Builder{}
+ builder.AddFile(name)
+ builder.AddAnnounceGroup([]string{"lol://cheezburger"})
+ batch, err := builder.Submit()
+ if err != nil {
+ panic(err)
+ }
+ errs, _ := batch.Start(w, 1)
+ <-errs
+}
+
+func TestDownloadOnDemand(t *testing.T) {
+ dir, err := ioutil.TempDir("", "torrentfs")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer func() {
+ if err := os.RemoveAll(dir); err != nil {
+ t.Error(err)
+ }
+ }()
+ t.Logf("test directory: %s", dir)
+ finishedDir := filepath.Join(dir, "finished")
+ os.Mkdir(finishedDir, 0777)
+ name := createDummyTorrentData(finishedDir)
+ metaInfoBuf := &bytes.Buffer{}
+ createMetaInfo(name, metaInfoBuf)
+ metaInfo, err := metainfo.Load(metaInfoBuf)
+ seeder := torrent.Client{
+ DataDir: finishedDir,
+ Listener: func() net.Listener {
+ conn, err := net.Listen("tcp", ":0")
+ if err != nil {
+ panic(err)
+ }
+ return conn
+ }(),
+ }
+ seeder.Start()
+ seeder.AddTorrent(metaInfo)
+ leecher := torrent.Client{
+ DataDir: filepath.Join(dir, "download"),
+ DataReady: make(chan torrent.DataSpec),
+ }
+ leecher.Start()
+ leecher.AddTorrent(metaInfo)
+ leecher.AddPeers(torrent.BytesInfoHash(metaInfo.InfoHash), []torrent.Peer{func() torrent.Peer {
+ tcpAddr := seeder.Listener.Addr().(*net.TCPAddr)
+ return torrent.Peer{
+ IP: tcpAddr.IP,
+ Port: tcpAddr.Port,
+ }
+ }()})
+ mountDir := filepath.Join(dir, "mnt")
+ os.Mkdir(mountDir, 0777)
+ err = MountAndServe(mountDir, &leecher)
+ if err != nil {
+ t.Fatal(err)
+ }
+}
)
const (
- Choke MessageType = iota
- Unchoke
- Interested
- NotInterested
- Have
- Bitfield
- Request
- Piece
- Cancel
+ Choke MessageType = iota
+ Unchoke // 1
+ Interested // 2
+ NotInterested // 3
+ Have // 4
+ Bitfield // 5
+ Request // 6
+ Piece // 7
+ Cancel // 8
)
type Message struct {
}
case Bitfield:
_, err = buf.Write(marshalBitfield(msg.Bitfield))
+ case Piece:
+ for _, i := range []Integer{msg.Index, msg.Begin} {
+ err = binary.Write(buf, binary.BigEndian, i)
+ if err != nil {
+ return
+ }
+ }
+ n, err := buf.Write(msg.Piece)
+ if err != nil {
+ break
+ }
+ if n != len(msg.Piece) {
+ panic(n)
+ }
default:
- err = errors.New("unknown message type")
+ err = fmt.Errorf("unknown message type: %s", msg.Type)
}
data = make([]byte, 4+buf.Len())
binary.BigEndian.PutUint32(data, uint32(buf.Len()))
case Have:
err = msg.Index.Read(r)
case Request, Cancel:
- err = binary.Read(r, binary.BigEndian, []*Integer{&msg.Index, &msg.Begin, &msg.Length})
+ for _, data := range []*Integer{&msg.Index, &msg.Begin, &msg.Length} {
+ err = data.Read(r)
+ if err != nil {
+ break
+ }
+ }
case Bitfield:
b := make([]byte, length-1)
_, err = io.ReadFull(r, b)
package peer_protocol
import (
+ "bytes"
"testing"
)
+func TestBinaryReadSliceOfPointers(t *testing.T) {
+ var msg Message
+ r := bytes.NewBufferString("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00@\x00")
+ if r.Len() != 12 {
+ t.Fatalf("expected 12 bytes left, but there %d", r.Len())
+ }
+ for _, data := range []*Integer{&msg.Index, &msg.Begin, &msg.Length} {
+ err := data.Read(r)
+ if err != nil {
+ t.Fatal(err)
+ }
+ }
+ if r.Len() != 0 {
+ t.FailNow()
+ }
+}
+
func TestConstants(t *testing.T) {
// check that iota works as expected in the const block
if NotInterested != 3 {