From 2e81f914e7ec486e0ad4d0f390f6f5bf942fac67 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Tue, 18 Mar 2014 01:44:22 +1100 Subject: [PATCH] Add a end-to-end test for torrentfs Basic unchoking and uploading. Accept incoming connections. Break out torrentfs lib. Fix and implement some protocol stuff. --- client.go | 100 ++++++++++--- client_test.go | 17 +++ cmd/torrent-create/main.go | 8 +- cmd/torrentfs/main.go | 238 ------------------------------ cmd/torrentfs/main_test.go | 28 ---- fs/torrentfs.go | 260 +++++++++++++++++++++++++++++++++ fs/torrentfs_test.go | 103 +++++++++++++ peer_protocol/protocol.go | 41 ++++-- peer_protocol/protocol_test.go | 18 +++ 9 files changed, 517 insertions(+), 296 deletions(-) create mode 100644 client_test.go delete mode 100644 cmd/torrentfs/main_test.go create mode 100644 fs/torrentfs.go create mode 100644 fs/torrentfs_test.go diff --git a/client.go b/client.go index f874daba..83d0d6a0 100644 --- a/client.go +++ b/client.go @@ -91,10 +91,12 @@ type Connection struct { post chan encoding.BinaryMarshaler write chan []byte + // Stuff controlled by the local peer. Interested bool Choked bool Requests map[Request]struct{} + // Stuff controlled by the remote peer. PeerId [20]byte PeerInterested bool PeerChoked bool @@ -142,6 +144,16 @@ func (c *Connection) Request(chunk Request) bool { return true } +func (c *Connection) Unchoke() { + if !c.Choked { + return + } + c.Post(peer_protocol.Message{ + Type: peer_protocol.Unchoke, + }) + c.Choked = false +} + func (c *Connection) SetInterested(interested bool) { if c.Interested == interested { return @@ -282,19 +294,24 @@ func (cl *Client) queuePieceCheck(t *Torrent, pieceIndex peer_protocol.Integer) } func (cl *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) { + log.Print(len_) cl.mu.Lock() defer cl.mu.Unlock() t := cl.torrent(ih) newPriorities := make([]Request, 0, (len_+2*(chunkSize-1))/chunkSize) - for len_ > 0 { - index := peer_protocol.Integer(off / int64(t.PieceLength(0))) - pieceOff := peer_protocol.Integer(off % int64(t.PieceLength(0))) + for len_ != 0 { + // TODO: Write a function to return the Request for a given offset. + index := peer_protocol.Integer(off / t.MetaInfo.PieceLength) + pieceOff := peer_protocol.Integer(off % t.MetaInfo.PieceLength) piece := t.Pieces[index] if !piece.EverHashed { cl.queuePieceCheck(t, index) } chunk := ChunkSpec{pieceOff / chunkSize * chunkSize, chunkSize} - adv := int64(chunkSize - pieceOff%chunkSize) + if int64(chunk.Length) > len_ { + chunk.Length = peer_protocol.Integer(len_) + } + adv := int64(chunk.Length - pieceOff%chunkSize) off += adv len_ -= adv if _, ok := piece.PendingChunkSpecs[chunk]; !ok && !piece.Hashing { @@ -357,8 +374,9 @@ func (t *Torrent) requestHeat() (ret map[Request]int) { } type Peer struct { - Id [20]byte - tracker.Peer + Id [20]byte + IP net.IP + Port int } func (t *Torrent) PieceLength(piece peer_protocol.Integer) (len_ peer_protocol.Integer) { @@ -394,6 +412,7 @@ type Client struct { HalfOpenLimit int PeerId [20]byte DataReady chan DataSpec + Listener net.Listener sync.Mutex mu *sync.Mutex @@ -415,7 +434,16 @@ func (cl *Client) TorrentReadAt(ih InfoHash, off int64, p []byte) (n int, err er err = errors.New("unknown torrent") return } - index := peer_protocol.Integer(off / int64(t.PieceLength(0))) + index := peer_protocol.Integer(off / t.MetaInfo.PieceLength) + // Reading outside the bounds of a file is an error. + if index < 0 { + err = os.ErrInvalid + return + } + if int(index) >= len(t.Pieces) { + err = io.EOF + return + } piece := t.Pieces[index] if !piece.EverHashed { cl.queuePieceCheck(t, index) @@ -459,6 +487,24 @@ func (c *Client) Start() { if err != nil { panic("error generating peer id") } + if c.Listener != nil { + go c.acceptConnections() + } +} + +func (cl *Client) acceptConnections() { + for { + conn, err := cl.Listener.Accept() + if err != nil { + log.Print(err) + return + } + go func() { + if err := cl.runConnection(conn, nil); err != nil { + log.Print(err) + } + }() + } } func mmapTorrentData(metaInfo *metainfo.MetaInfo, location string) (mms MMapSpan, err error) { @@ -539,7 +585,7 @@ func (me *Client) initiateConn(peer Peer, torrent *Torrent) { return } log.Printf("connected to %s", conn.RemoteAddr()) - err = me.runConnection(conn, torrent, peer.Id) + err = me.runConnection(conn, torrent) if err != nil { log.Print(err) } @@ -564,7 +610,7 @@ func (me *Torrent) haveAnyPieces() bool { return false } -func (me *Client) runConnection(sock net.Conn, torrent *Torrent, peerId [20]byte) (err error) { +func (me *Client) runConnection(sock net.Conn, torrent *Torrent) (err error) { conn := &Connection{ Socket: sock, Choked: true, @@ -676,15 +722,36 @@ func (me *Client) connectionLoop(torrent *Torrent, conn *Connection) error { me.peerUnchoked(torrent, conn) case peer_protocol.Interested: conn.PeerInterested = true + // TODO: This should be done from a dedicated unchoking routine. + conn.Unchoke() case peer_protocol.NotInterested: conn.PeerInterested = false case peer_protocol.Have: me.peerGotPiece(torrent, conn, int(msg.Index)) case peer_protocol.Request: - conn.PeerRequests[Request{ + if conn.PeerRequests == nil { + conn.PeerRequests = make(map[Request]struct{}, maxRequests) + } + request := Request{ Index: msg.Index, ChunkSpec: ChunkSpec{msg.Begin, msg.Length}, - }] = struct{}{} + } + conn.PeerRequests[request] = struct{}{} + // TODO: Requests should be satisfied from a dedicated upload routine. + p := make([]byte, msg.Length) + n, err := torrent.Data.ReadAt(p, int64(torrent.PieceLength(0))*int64(msg.Index)+int64(msg.Begin)) + if err != nil { + return fmt.Errorf("reading torrent data to serve request %s: %s", request, err) + } + if n != int(msg.Length) { + return fmt.Errorf("bad request: %s", msg) + } + conn.Post(peer_protocol.Message{ + Type: peer_protocol.Piece, + Index: msg.Index, + Begin: msg.Begin, + Piece: p, + }) case peer_protocol.Bitfield: if len(msg.Bitfield) < len(torrent.Pieces) { err = errors.New("received invalid bitfield") @@ -827,10 +894,9 @@ func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) error { } me.torrents[torrent.InfoHash] = torrent go me.announceTorrent(torrent) - go me.PrioritizeDataRegion(torrent.InfoHash, 0, torrent.Length()) - // for i := range torrent.Pieces { - // me.queuePieceCheck(torrent, peer_protocol.Integer(i)) - // } + for i := range torrent.Pieces { + me.queuePieceCheck(torrent, peer_protocol.Integer(i)) + } return nil } @@ -857,7 +923,8 @@ newAnnounce: var peers []Peer for _, peer := range resp.Peers { peers = append(peers, Peer{ - Peer: peer, + IP: peer.IP, + Port: peer.Port, }) } if err := cl.AddPeers(t.InfoHash, peers); err != nil { @@ -943,7 +1010,6 @@ func (me *Client) downloadedChunk(torrent *Torrent, msg *peer_protocol.Message) delete(torrent.Pieces[request.Index].PendingChunkSpecs, request.ChunkSpec) if len(torrent.Pieces[request.Index].PendingChunkSpecs) == 0 { me.queuePieceCheck(torrent, request.Index) - return } var next *list.Element for e := torrent.Priorities.Front(); e != nil; e = next { diff --git a/client_test.go b/client_test.go new file mode 100644 index 00000000..ec15ee46 --- /dev/null +++ b/client_test.go @@ -0,0 +1,17 @@ +package torrent + +import ( + "testing" +) + +func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) { + t.SkipNow() +} + +func TestAddTorrentNoUsableURLs(t *testing.T) { + t.SkipNow() +} + +func TestAddPeersToUnknownTorrent(t *testing.T) { + t.SkipNow() +} diff --git a/cmd/torrent-create/main.go b/cmd/torrent-create/main.go index 5bd78425..2ff1d84f 100644 --- a/cmd/torrent-create/main.go +++ b/cmd/torrent-create/main.go @@ -10,7 +10,11 @@ import ( ) var ( - builtinAnnounceGroups = [][]string{{"udp://tracker.openbittorrent.com:80"}, {"udp://tracker.publicbt.com:80"}, {"udp://tracker.istole.it:6969"}} + builtinAnnounceList = [][]string{ + {"udp://tracker.openbittorrent.com:80"}, + {"udp://tracker.publicbt.com:80"}, + {"udp://tracker.istole.it:6969"}, + } ) func init() { @@ -32,7 +36,7 @@ func main() { log.Print(err) } } - for _, group := range builtinAnnounceGroups { + for _, group := range builtinAnnounceList { b.AddAnnounceGroup(group) } batch, err := b.Submit() diff --git a/cmd/torrentfs/main.go b/cmd/torrentfs/main.go index be2295fb..f13774c1 100644 --- a/cmd/torrentfs/main.go +++ b/cmd/torrentfs/main.go @@ -23,10 +23,6 @@ var ( mountDir string ) -const ( - defaultMode = 0555 -) - func init() { flag.StringVar(&downloadDir, "downloadDir", "", "location to save torrent data") flag.StringVar(&torrentPath, "torrentPath", func() string { @@ -39,238 +35,6 @@ func init() { flag.StringVar(&mountDir, "mountDir", "", "location the torrent contents are made available") } -type TorrentFS struct { - Client *torrent.Client - DataSubs map[chan torrent.DataSpec]struct{} - sync.Mutex -} - -func (tfs *TorrentFS) publishData() { - for { - spec := <-tfs.Client.DataReady - tfs.Lock() - for ds := range tfs.DataSubs { - ds <- spec - } - tfs.Unlock() - } -} - -func (tfs *TorrentFS) SubscribeData() chan torrent.DataSpec { - ch := make(chan torrent.DataSpec) - tfs.Lock() - tfs.DataSubs[ch] = struct{}{} - tfs.Unlock() - return ch -} - -func (tfs *TorrentFS) UnsubscribeData(ch chan torrent.DataSpec) { - go func() { - for _ = range ch { - } - }() - tfs.Lock() - delete(tfs.DataSubs, ch) - tfs.Unlock() - close(ch) -} - -type rootNode struct { - fs *TorrentFS -} - -type node struct { - path []string - metaInfo *metainfo.MetaInfo - FS *TorrentFS - InfoHash torrent.InfoHash -} - -type fileNode struct { - node - size uint64 - TorrentOffset int64 -} - -func (fn fileNode) Attr() (attr fuse.Attr) { - attr.Size = fn.size - attr.Mode = defaultMode - return -} - -func (fn fileNode) Read(req *fuse.ReadRequest, resp *fuse.ReadResponse, intr fusefs.Intr) fuse.Error { - if req.Dir { - panic("hodor") - } - dataSpecs := fn.FS.SubscribeData() - defer fn.FS.UnsubscribeData(dataSpecs) - data := make([]byte, func() int { - _len := int64(fn.size) - req.Offset - if int64(req.Size) < _len { - return req.Size - } else { - // limit read to the end of the file - return int(_len) - } - }()) - infoHash := torrent.BytesInfoHash(fn.metaInfo.InfoHash) - torrentOff := fn.TorrentOffset + req.Offset - fn.FS.Client.PrioritizeDataRegion(infoHash, torrentOff, int64(len(data))) - for { - n, err := fn.FS.Client.TorrentReadAt(infoHash, torrentOff, data) - // log.Println(torrentOff, len(data), n, err) - switch err { - case nil: - resp.Data = data[:n] - return nil - case torrent.ErrDataNotReady: - select { - case <-dataSpecs: - case <-intr: - return fuse.EINTR - } - default: - log.Print(err) - return fuse.EIO - } - } -} - -type dirNode struct { - node -} - -var ( - _ fusefs.HandleReadDirer = dirNode{} - - _ fusefs.HandleReader = fileNode{} -) - -func isSubPath(parent, child []string) bool { - if len(child) <= len(parent) { - return false - } - for i := range parent { - if parent[i] != child[i] { - return false - } - } - return true -} - -func (dn dirNode) ReadDir(intr fusefs.Intr) (des []fuse.Dirent, err fuse.Error) { - names := map[string]bool{} - for _, fi := range dn.metaInfo.Files { - if !isSubPath(dn.path, fi.Path) { - continue - } - name := fi.Path[len(dn.path)] - if names[name] { - continue - } - names[name] = true - de := fuse.Dirent{ - Name: name, - } - if len(fi.Path) == len(dn.path)+1 { - de.Type = fuse.DT_File - } else { - de.Type = fuse.DT_Dir - } - des = append(des, de) - } - return -} - -func (dn dirNode) Lookup(name string, intr fusefs.Intr) (_node fusefs.Node, err fuse.Error) { - var torrentOffset int64 - for _, fi := range dn.metaInfo.Files { - if !isSubPath(dn.path, fi.Path) { - torrentOffset += fi.Length - continue - } - if fi.Path[len(dn.path)] != name { - torrentOffset += fi.Length - continue - } - __node := dn.node - __node.path = append(__node.path, name) - if len(fi.Path) == len(dn.path)+1 { - _node = fileNode{ - node: __node, - size: uint64(fi.Length), - TorrentOffset: torrentOffset, - } - } else { - _node = dirNode{__node} - } - break - } - if _node == nil { - err = fuse.ENOENT - } - return -} - -func (dn dirNode) Attr() (attr fuse.Attr) { - attr.Mode = os.ModeDir | defaultMode - return -} - -func isSingleFileTorrent(mi *metainfo.MetaInfo) bool { - return len(mi.Files) == 1 && mi.Files[0].Path == nil -} - -func (me rootNode) Lookup(name string, intr fusefs.Intr) (_node fusefs.Node, err fuse.Error) { - for _, _torrent := range me.fs.Client.Torrents() { - metaInfo := _torrent.MetaInfo - if metaInfo.Name == name { - __node := node{ - metaInfo: metaInfo, - FS: me.fs, - InfoHash: torrent.BytesInfoHash(metaInfo.InfoHash), - } - if isSingleFileTorrent(metaInfo) { - _node = fileNode{__node, uint64(metaInfo.Files[0].Length), 0} - } else { - _node = dirNode{__node} - } - break - } - } - if _node == nil { - err = fuse.ENOENT - } - return -} - -func (me rootNode) ReadDir(intr fusefs.Intr) (dirents []fuse.Dirent, err fuse.Error) { - for _, _torrent := range me.fs.Client.Torrents() { - metaInfo := _torrent.MetaInfo - dirents = append(dirents, fuse.Dirent{ - Name: metaInfo.Name, - Type: func() fuse.DirentType { - if isSingleFileTorrent(metaInfo) { - return fuse.DT_File - } else { - return fuse.DT_Dir - } - }(), - }) - } - return -} - -func (rootNode) Attr() fuse.Attr { - return fuse.Attr{ - Mode: os.ModeDir, - } -} - -func (tfs *TorrentFS) Root() (fusefs.Node, fuse.Error) { - return rootNode{tfs}, nil -} - func main() { pprofAddr := flag.String("pprofAddr", "", "pprof HTTP server bind address") testPeer := flag.String("testPeer", "", "the address for a test peer") @@ -279,10 +43,8 @@ func main() { if *pprofAddr != "" { go http.ListenAndServe(*pprofAddr, nil) } - // defer profile.Start(profile.CPUProfile).Stop() client := &torrent.Client{ DataDir: downloadDir, - DataReady: make(chan torrent.DataSpec), HalfOpenLimit: 2, } client.Start() diff --git a/cmd/torrentfs/main_test.go b/cmd/torrentfs/main_test.go deleted file mode 100644 index e4ea512e..00000000 --- a/cmd/torrentfs/main_test.go +++ /dev/null @@ -1,28 +0,0 @@ -package main - -import ( - "net" - "testing" -) - -func TestTCPAddrString(t *testing.T) { - ta := &net.TCPAddr{ - IP: net.IPv4(127, 0, 0, 1), - Port: 3000, - } - s := ta.String() - l, err := net.Listen("tcp4", "localhost:3000") - if err != nil { - t.Fatal(err) - } - defer l.Close() - c, err := net.Dial("tcp", l.Addr().String()) - if err != nil { - t.Fatal(err) - } - defer c.Close() - ras := c.RemoteAddr().String() - if ras != s { - t.FailNow() - } -} diff --git a/fs/torrentfs.go b/fs/torrentfs.go new file mode 100644 index 00000000..b6fcbe51 --- /dev/null +++ b/fs/torrentfs.go @@ -0,0 +1,260 @@ +package torrentfs + +import ( + "bazil.org/fuse" + fusefs "bazil.org/fuse/fs" + "bitbucket.org/anacrolix/go.torrent" + metainfo "github.com/nsf/libtorgo/torrent" + "os" + "sync" +) + +const ( + defaultMode = 0555 +) + +type torrentFS struct { + Client *torrent.Client + DataSubs map[chan torrent.DataSpec]struct{} + sync.Mutex +} + +func (tfs *torrentFS) publishData() { + for { + spec := <-tfs.Client.DataReady + tfs.Lock() + for ds := range tfs.DataSubs { + ds <- spec + } + tfs.Unlock() + } +} + +func (tfs *torrentFS) SubscribeData() chan torrent.DataSpec { + ch := make(chan torrent.DataSpec) + tfs.Lock() + tfs.DataSubs[ch] = struct{}{} + tfs.Unlock() + return ch +} + +func (tfs *torrentFS) UnsubscribeData(ch chan torrent.DataSpec) { + go func() { + for _ = range ch { + } + }() + tfs.Lock() + delete(tfs.DataSubs, ch) + tfs.Unlock() + close(ch) +} + +type rootNode struct { + fs *torrentFS +} + +type node struct { + path []string + metaInfo *metainfo.MetaInfo + FS *torrentFS + InfoHash torrent.InfoHash +} + +type fileNode struct { + node + size uint64 + TorrentOffset int64 +} + +func (fn fileNode) Attr() (attr fuse.Attr) { + attr.Size = fn.size + attr.Mode = defaultMode + return +} + +func (fn fileNode) Read(req *fuse.ReadRequest, resp *fuse.ReadResponse, intr fusefs.Intr) fuse.Error { + if req.Dir { + panic("hodor") + } + dataSpecs := fn.FS.SubscribeData() + defer fn.FS.UnsubscribeData(dataSpecs) + data := make([]byte, func() int { + _len := int64(fn.size) - req.Offset + if int64(req.Size) < _len { + return req.Size + } else { + // limit read to the end of the file + return int(_len) + } + }()) + if len(data) == 0 { + return nil + } + infoHash := torrent.BytesInfoHash(fn.metaInfo.InfoHash) + torrentOff := fn.TorrentOffset + req.Offset + fn.FS.Client.PrioritizeDataRegion(infoHash, torrentOff, int64(len(data))) + for { + n, err := fn.FS.Client.TorrentReadAt(infoHash, torrentOff, data) + switch err { + case nil: + resp.Data = data[:n] + return nil + case torrent.ErrDataNotReady: + select { + case <-dataSpecs: + case <-intr: + return fuse.EINTR + } + default: + return fuse.EIO + } + } +} + +type dirNode struct { + node +} + +var ( + _ fusefs.HandleReadDirer = dirNode{} + + _ fusefs.HandleReader = fileNode{} +) + +func isSubPath(parent, child []string) bool { + if len(child) <= len(parent) { + return false + } + for i := range parent { + if parent[i] != child[i] { + return false + } + } + return true +} + +func (dn dirNode) ReadDir(intr fusefs.Intr) (des []fuse.Dirent, err fuse.Error) { + names := map[string]bool{} + for _, fi := range dn.metaInfo.Files { + if !isSubPath(dn.path, fi.Path) { + continue + } + name := fi.Path[len(dn.path)] + if names[name] { + continue + } + names[name] = true + de := fuse.Dirent{ + Name: name, + } + if len(fi.Path) == len(dn.path)+1 { + de.Type = fuse.DT_File + } else { + de.Type = fuse.DT_Dir + } + des = append(des, de) + } + return +} + +func (dn dirNode) Lookup(name string, intr fusefs.Intr) (_node fusefs.Node, err fuse.Error) { + var torrentOffset int64 + for _, fi := range dn.metaInfo.Files { + if !isSubPath(dn.path, fi.Path) { + torrentOffset += fi.Length + continue + } + if fi.Path[len(dn.path)] != name { + torrentOffset += fi.Length + continue + } + __node := dn.node + __node.path = append(__node.path, name) + if len(fi.Path) == len(dn.path)+1 { + _node = fileNode{ + node: __node, + size: uint64(fi.Length), + TorrentOffset: torrentOffset, + } + } else { + _node = dirNode{__node} + } + break + } + if _node == nil { + err = fuse.ENOENT + } + return +} + +func (dn dirNode) Attr() (attr fuse.Attr) { + attr.Mode = os.ModeDir | defaultMode + return +} + +func isSingleFileTorrent(mi *metainfo.MetaInfo) bool { + return len(mi.Files) == 1 && mi.Files[0].Path == nil +} + +func (me rootNode) Lookup(name string, intr fusefs.Intr) (_node fusefs.Node, err fuse.Error) { + for _, _torrent := range me.fs.Client.Torrents() { + metaInfo := _torrent.MetaInfo + if metaInfo.Name == name { + __node := node{ + metaInfo: metaInfo, + FS: me.fs, + InfoHash: torrent.BytesInfoHash(metaInfo.InfoHash), + } + if isSingleFileTorrent(metaInfo) { + _node = fileNode{__node, uint64(metaInfo.Files[0].Length), 0} + } else { + _node = dirNode{__node} + } + break + } + } + if _node == nil { + err = fuse.ENOENT + } + return +} + +func (me rootNode) ReadDir(intr fusefs.Intr) (dirents []fuse.Dirent, err fuse.Error) { + for _, _torrent := range me.fs.Client.Torrents() { + metaInfo := _torrent.MetaInfo + dirents = append(dirents, fuse.Dirent{ + Name: metaInfo.Name, + Type: func() fuse.DirentType { + if isSingleFileTorrent(metaInfo) { + return fuse.DT_File + } else { + return fuse.DT_Dir + } + }(), + }) + } + return +} + +func (rootNode) Attr() fuse.Attr { + return fuse.Attr{ + Mode: os.ModeDir, + } +} + +func (tfs *torrentFS) Root() (fusefs.Node, fuse.Error) { + return rootNode{tfs}, nil +} + +func MountAndServe(dir string, cl *torrent.Client) error { + conn, err := fuse.Mount(dir) + if err != nil { + return err + } + fs := &torrentFS{ + Client: cl, + DataSubs: make(map[chan torrent.DataSpec]struct{}), + } + go fs.publishData() + return fusefs.Serve(conn, fs) +} diff --git a/fs/torrentfs_test.go b/fs/torrentfs_test.go new file mode 100644 index 00000000..962d6e26 --- /dev/null +++ b/fs/torrentfs_test.go @@ -0,0 +1,103 @@ +package torrentfs + +import ( + "bitbucket.org/anacrolix/go.torrent" + "bytes" + metainfo "github.com/nsf/libtorgo/torrent" + "io" + "io/ioutil" + "net" + "os" + "path/filepath" + "testing" +) + +func TestTCPAddrString(t *testing.T) { + ta := &net.TCPAddr{ + IP: net.IPv4(127, 0, 0, 1), + Port: 3000, + } + s := ta.String() + l, err := net.Listen("tcp4", "localhost:3000") + if err != nil { + t.Fatal(err) + } + defer l.Close() + c, err := net.Dial("tcp", l.Addr().String()) + if err != nil { + t.Fatal(err) + } + defer c.Close() + ras := c.RemoteAddr().String() + if ras != s { + t.FailNow() + } +} + +func createDummyTorrentData(dirName string) string { + f, _ := os.Create(filepath.Join(dirName, "greeting")) + f.WriteString("hello, world\n") + return f.Name() +} + +func createMetaInfo(name string, w io.Writer) { + builder := metainfo.Builder{} + builder.AddFile(name) + builder.AddAnnounceGroup([]string{"lol://cheezburger"}) + batch, err := builder.Submit() + if err != nil { + panic(err) + } + errs, _ := batch.Start(w, 1) + <-errs +} + +func TestDownloadOnDemand(t *testing.T) { + dir, err := ioutil.TempDir("", "torrentfs") + if err != nil { + t.Fatal(err) + } + defer func() { + if err := os.RemoveAll(dir); err != nil { + t.Error(err) + } + }() + t.Logf("test directory: %s", dir) + finishedDir := filepath.Join(dir, "finished") + os.Mkdir(finishedDir, 0777) + name := createDummyTorrentData(finishedDir) + metaInfoBuf := &bytes.Buffer{} + createMetaInfo(name, metaInfoBuf) + metaInfo, err := metainfo.Load(metaInfoBuf) + seeder := torrent.Client{ + DataDir: finishedDir, + Listener: func() net.Listener { + conn, err := net.Listen("tcp", ":0") + if err != nil { + panic(err) + } + return conn + }(), + } + seeder.Start() + seeder.AddTorrent(metaInfo) + leecher := torrent.Client{ + DataDir: filepath.Join(dir, "download"), + DataReady: make(chan torrent.DataSpec), + } + leecher.Start() + leecher.AddTorrent(metaInfo) + leecher.AddPeers(torrent.BytesInfoHash(metaInfo.InfoHash), []torrent.Peer{func() torrent.Peer { + tcpAddr := seeder.Listener.Addr().(*net.TCPAddr) + return torrent.Peer{ + IP: tcpAddr.IP, + Port: tcpAddr.Port, + } + }()}) + mountDir := filepath.Join(dir, "mnt") + os.Mkdir(mountDir, 0777) + err = MountAndServe(mountDir, &leecher) + if err != nil { + t.Fatal(err) + } +} diff --git a/peer_protocol/protocol.go b/peer_protocol/protocol.go index 2f9547ef..281d8486 100644 --- a/peer_protocol/protocol.go +++ b/peer_protocol/protocol.go @@ -24,15 +24,15 @@ const ( ) const ( - Choke MessageType = iota - Unchoke - Interested - NotInterested - Have - Bitfield - Request - Piece - Cancel + Choke MessageType = iota + Unchoke // 1 + Interested // 2 + NotInterested // 3 + Have // 4 + Bitfield // 5 + Request // 6 + Piece // 7 + Cancel // 8 ) type Message struct { @@ -66,8 +66,22 @@ func (msg Message) MarshalBinary() (data []byte, err error) { } case Bitfield: _, err = buf.Write(marshalBitfield(msg.Bitfield)) + case Piece: + for _, i := range []Integer{msg.Index, msg.Begin} { + err = binary.Write(buf, binary.BigEndian, i) + if err != nil { + return + } + } + n, err := buf.Write(msg.Piece) + if err != nil { + break + } + if n != len(msg.Piece) { + panic(n) + } default: - err = errors.New("unknown message type") + err = fmt.Errorf("unknown message type: %s", msg.Type) } data = make([]byte, 4+buf.Len()) binary.BigEndian.PutUint32(data, uint32(buf.Len())) @@ -114,7 +128,12 @@ func (d *Decoder) Decode(msg *Message) (err error) { case Have: err = msg.Index.Read(r) case Request, Cancel: - err = binary.Read(r, binary.BigEndian, []*Integer{&msg.Index, &msg.Begin, &msg.Length}) + for _, data := range []*Integer{&msg.Index, &msg.Begin, &msg.Length} { + err = data.Read(r) + if err != nil { + break + } + } case Bitfield: b := make([]byte, length-1) _, err = io.ReadFull(r, b) diff --git a/peer_protocol/protocol_test.go b/peer_protocol/protocol_test.go index caebd8ef..e9e45352 100644 --- a/peer_protocol/protocol_test.go +++ b/peer_protocol/protocol_test.go @@ -1,9 +1,27 @@ package peer_protocol import ( + "bytes" "testing" ) +func TestBinaryReadSliceOfPointers(t *testing.T) { + var msg Message + r := bytes.NewBufferString("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00@\x00") + if r.Len() != 12 { + t.Fatalf("expected 12 bytes left, but there %d", r.Len()) + } + for _, data := range []*Integer{&msg.Index, &msg.Begin, &msg.Length} { + err := data.Read(r) + if err != nil { + t.Fatal(err) + } + } + if r.Len() != 0 { + t.FailNow() + } +} + func TestConstants(t *testing.T) { // check that iota works as expected in the const block if NotInterested != 3 { -- 2.48.1