]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Add a end-to-end test for torrentfs
authorMatt Joiner <anacrolix@gmail.com>
Mon, 17 Mar 2014 14:44:22 +0000 (01:44 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 17 Mar 2014 14:44:22 +0000 (01:44 +1100)
Basic unchoking and uploading.
Accept incoming connections.
Break out torrentfs lib.
Fix and implement some protocol stuff.

client.go
client_test.go [new file with mode: 0644]
cmd/torrent-create/main.go
cmd/torrentfs/main.go
cmd/torrentfs/main_test.go [deleted file]
fs/torrentfs.go [new file with mode: 0644]
fs/torrentfs_test.go [new file with mode: 0644]
peer_protocol/protocol.go
peer_protocol/protocol_test.go

index f874daba7e752c44b2151a509acea0da33b5fd80..83d0d6a082133f05d4bebe595b6d8f462ae31927 100644 (file)
--- a/client.go
+++ b/client.go
@@ -91,10 +91,12 @@ type Connection struct {
        post   chan encoding.BinaryMarshaler
        write  chan []byte
 
+       // Stuff controlled by the local peer.
        Interested bool
        Choked     bool
        Requests   map[Request]struct{}
 
+       // Stuff controlled by the remote peer.
        PeerId         [20]byte
        PeerInterested bool
        PeerChoked     bool
@@ -142,6 +144,16 @@ func (c *Connection) Request(chunk Request) bool {
        return true
 }
 
+func (c *Connection) Unchoke() {
+       if !c.Choked {
+               return
+       }
+       c.Post(peer_protocol.Message{
+               Type: peer_protocol.Unchoke,
+       })
+       c.Choked = false
+}
+
 func (c *Connection) SetInterested(interested bool) {
        if c.Interested == interested {
                return
@@ -282,19 +294,24 @@ func (cl *Client) queuePieceCheck(t *Torrent, pieceIndex peer_protocol.Integer)
 }
 
 func (cl *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) {
+       log.Print(len_)
        cl.mu.Lock()
        defer cl.mu.Unlock()
        t := cl.torrent(ih)
        newPriorities := make([]Request, 0, (len_+2*(chunkSize-1))/chunkSize)
-       for len_ > 0 {
-               index := peer_protocol.Integer(off / int64(t.PieceLength(0)))
-               pieceOff := peer_protocol.Integer(off % int64(t.PieceLength(0)))
+       for len_ != 0 {
+               // TODO: Write a function to return the Request for a given offset.
+               index := peer_protocol.Integer(off / t.MetaInfo.PieceLength)
+               pieceOff := peer_protocol.Integer(off % t.MetaInfo.PieceLength)
                piece := t.Pieces[index]
                if !piece.EverHashed {
                        cl.queuePieceCheck(t, index)
                }
                chunk := ChunkSpec{pieceOff / chunkSize * chunkSize, chunkSize}
-               adv := int64(chunkSize - pieceOff%chunkSize)
+               if int64(chunk.Length) > len_ {
+                       chunk.Length = peer_protocol.Integer(len_)
+               }
+               adv := int64(chunk.Length - pieceOff%chunkSize)
                off += adv
                len_ -= adv
                if _, ok := piece.PendingChunkSpecs[chunk]; !ok && !piece.Hashing {
@@ -357,8 +374,9 @@ func (t *Torrent) requestHeat() (ret map[Request]int) {
 }
 
 type Peer struct {
-       Id [20]byte
-       tracker.Peer
+       Id   [20]byte
+       IP   net.IP
+       Port int
 }
 
 func (t *Torrent) PieceLength(piece peer_protocol.Integer) (len_ peer_protocol.Integer) {
@@ -394,6 +412,7 @@ type Client struct {
        HalfOpenLimit int
        PeerId        [20]byte
        DataReady     chan DataSpec
+       Listener      net.Listener
 
        sync.Mutex
        mu    *sync.Mutex
@@ -415,7 +434,16 @@ func (cl *Client) TorrentReadAt(ih InfoHash, off int64, p []byte) (n int, err er
                err = errors.New("unknown torrent")
                return
        }
-       index := peer_protocol.Integer(off / int64(t.PieceLength(0)))
+       index := peer_protocol.Integer(off / t.MetaInfo.PieceLength)
+       // Reading outside the bounds of a file is an error.
+       if index < 0 {
+               err = os.ErrInvalid
+               return
+       }
+       if int(index) >= len(t.Pieces) {
+               err = io.EOF
+               return
+       }
        piece := t.Pieces[index]
        if !piece.EverHashed {
                cl.queuePieceCheck(t, index)
@@ -459,6 +487,24 @@ func (c *Client) Start() {
        if err != nil {
                panic("error generating peer id")
        }
+       if c.Listener != nil {
+               go c.acceptConnections()
+       }
+}
+
+func (cl *Client) acceptConnections() {
+       for {
+               conn, err := cl.Listener.Accept()
+               if err != nil {
+                       log.Print(err)
+                       return
+               }
+               go func() {
+                       if err := cl.runConnection(conn, nil); err != nil {
+                               log.Print(err)
+                       }
+               }()
+       }
 }
 
 func mmapTorrentData(metaInfo *metainfo.MetaInfo, location string) (mms MMapSpan, err error) {
@@ -539,7 +585,7 @@ func (me *Client) initiateConn(peer Peer, torrent *Torrent) {
                        return
                }
                log.Printf("connected to %s", conn.RemoteAddr())
-               err = me.runConnection(conn, torrent, peer.Id)
+               err = me.runConnection(conn, torrent)
                if err != nil {
                        log.Print(err)
                }
@@ -564,7 +610,7 @@ func (me *Torrent) haveAnyPieces() bool {
        return false
 }
 
-func (me *Client) runConnection(sock net.Conn, torrent *Torrent, peerId [20]byte) (err error) {
+func (me *Client) runConnection(sock net.Conn, torrent *Torrent) (err error) {
        conn := &Connection{
                Socket:     sock,
                Choked:     true,
@@ -676,15 +722,36 @@ func (me *Client) connectionLoop(torrent *Torrent, conn *Connection) error {
                        me.peerUnchoked(torrent, conn)
                case peer_protocol.Interested:
                        conn.PeerInterested = true
+                       // TODO: This should be done from a dedicated unchoking routine.
+                       conn.Unchoke()
                case peer_protocol.NotInterested:
                        conn.PeerInterested = false
                case peer_protocol.Have:
                        me.peerGotPiece(torrent, conn, int(msg.Index))
                case peer_protocol.Request:
-                       conn.PeerRequests[Request{
+                       if conn.PeerRequests == nil {
+                               conn.PeerRequests = make(map[Request]struct{}, maxRequests)
+                       }
+                       request := Request{
                                Index:     msg.Index,
                                ChunkSpec: ChunkSpec{msg.Begin, msg.Length},
-                       }] = struct{}{}
+                       }
+                       conn.PeerRequests[request] = struct{}{}
+                       // TODO: Requests should be satisfied from a dedicated upload routine.
+                       p := make([]byte, msg.Length)
+                       n, err := torrent.Data.ReadAt(p, int64(torrent.PieceLength(0))*int64(msg.Index)+int64(msg.Begin))
+                       if err != nil {
+                               return fmt.Errorf("reading torrent data to serve request %s: %s", request, err)
+                       }
+                       if n != int(msg.Length) {
+                               return fmt.Errorf("bad request: %s", msg)
+                       }
+                       conn.Post(peer_protocol.Message{
+                               Type:  peer_protocol.Piece,
+                               Index: msg.Index,
+                               Begin: msg.Begin,
+                               Piece: p,
+                       })
                case peer_protocol.Bitfield:
                        if len(msg.Bitfield) < len(torrent.Pieces) {
                                err = errors.New("received invalid bitfield")
@@ -827,10 +894,9 @@ func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) error {
        }
        me.torrents[torrent.InfoHash] = torrent
        go me.announceTorrent(torrent)
-       go me.PrioritizeDataRegion(torrent.InfoHash, 0, torrent.Length())
-       // for i := range torrent.Pieces {
-       //      me.queuePieceCheck(torrent, peer_protocol.Integer(i))
-       // }
+       for i := range torrent.Pieces {
+               me.queuePieceCheck(torrent, peer_protocol.Integer(i))
+       }
        return nil
 }
 
@@ -857,7 +923,8 @@ newAnnounce:
                                var peers []Peer
                                for _, peer := range resp.Peers {
                                        peers = append(peers, Peer{
-                                               Peer: peer,
+                                               IP:   peer.IP,
+                                               Port: peer.Port,
                                        })
                                }
                                if err := cl.AddPeers(t.InfoHash, peers); err != nil {
@@ -943,7 +1010,6 @@ func (me *Client) downloadedChunk(torrent *Torrent, msg *peer_protocol.Message)
        delete(torrent.Pieces[request.Index].PendingChunkSpecs, request.ChunkSpec)
        if len(torrent.Pieces[request.Index].PendingChunkSpecs) == 0 {
                me.queuePieceCheck(torrent, request.Index)
-               return
        }
        var next *list.Element
        for e := torrent.Priorities.Front(); e != nil; e = next {
diff --git a/client_test.go b/client_test.go
new file mode 100644 (file)
index 0000000..ec15ee4
--- /dev/null
@@ -0,0 +1,17 @@
+package torrent
+
+import (
+       "testing"
+)
+
+func TestAddTorrentNoSupportedTrackerSchemes(t *testing.T) {
+       t.SkipNow()
+}
+
+func TestAddTorrentNoUsableURLs(t *testing.T) {
+       t.SkipNow()
+}
+
+func TestAddPeersToUnknownTorrent(t *testing.T) {
+       t.SkipNow()
+}
index 5bd784250800a19e8f5efd78d3cfd186dac56442..2ff1d84fb2d296dc847a58c78da579f6e3f831a5 100644 (file)
@@ -10,7 +10,11 @@ import (
 )
 
 var (
-       builtinAnnounceGroups = [][]string{{"udp://tracker.openbittorrent.com:80"}, {"udp://tracker.publicbt.com:80"}, {"udp://tracker.istole.it:6969"}}
+       builtinAnnounceList = [][]string{
+               {"udp://tracker.openbittorrent.com:80"},
+               {"udp://tracker.publicbt.com:80"},
+               {"udp://tracker.istole.it:6969"},
+       }
 )
 
 func init() {
@@ -32,7 +36,7 @@ func main() {
                        log.Print(err)
                }
        }
-       for _, group := range builtinAnnounceGroups {
+       for _, group := range builtinAnnounceList {
                b.AddAnnounceGroup(group)
        }
        batch, err := b.Submit()
index be2295fb846988ec1b0fde58e9d6e3fd6a469822..f13774c1626bef3515e9bb13334b7fda34009f30 100644 (file)
@@ -23,10 +23,6 @@ var (
        mountDir    string
 )
 
-const (
-       defaultMode = 0555
-)
-
 func init() {
        flag.StringVar(&downloadDir, "downloadDir", "", "location to save torrent data")
        flag.StringVar(&torrentPath, "torrentPath", func() string {
@@ -39,238 +35,6 @@ func init() {
        flag.StringVar(&mountDir, "mountDir", "", "location the torrent contents are made available")
 }
 
-type TorrentFS struct {
-       Client   *torrent.Client
-       DataSubs map[chan torrent.DataSpec]struct{}
-       sync.Mutex
-}
-
-func (tfs *TorrentFS) publishData() {
-       for {
-               spec := <-tfs.Client.DataReady
-               tfs.Lock()
-               for ds := range tfs.DataSubs {
-                       ds <- spec
-               }
-               tfs.Unlock()
-       }
-}
-
-func (tfs *TorrentFS) SubscribeData() chan torrent.DataSpec {
-       ch := make(chan torrent.DataSpec)
-       tfs.Lock()
-       tfs.DataSubs[ch] = struct{}{}
-       tfs.Unlock()
-       return ch
-}
-
-func (tfs *TorrentFS) UnsubscribeData(ch chan torrent.DataSpec) {
-       go func() {
-               for _ = range ch {
-               }
-       }()
-       tfs.Lock()
-       delete(tfs.DataSubs, ch)
-       tfs.Unlock()
-       close(ch)
-}
-
-type rootNode struct {
-       fs *TorrentFS
-}
-
-type node struct {
-       path     []string
-       metaInfo *metainfo.MetaInfo
-       FS       *TorrentFS
-       InfoHash torrent.InfoHash
-}
-
-type fileNode struct {
-       node
-       size          uint64
-       TorrentOffset int64
-}
-
-func (fn fileNode) Attr() (attr fuse.Attr) {
-       attr.Size = fn.size
-       attr.Mode = defaultMode
-       return
-}
-
-func (fn fileNode) Read(req *fuse.ReadRequest, resp *fuse.ReadResponse, intr fusefs.Intr) fuse.Error {
-       if req.Dir {
-               panic("hodor")
-       }
-       dataSpecs := fn.FS.SubscribeData()
-       defer fn.FS.UnsubscribeData(dataSpecs)
-       data := make([]byte, func() int {
-               _len := int64(fn.size) - req.Offset
-               if int64(req.Size) < _len {
-                       return req.Size
-               } else {
-                       // limit read to the end of the file
-                       return int(_len)
-               }
-       }())
-       infoHash := torrent.BytesInfoHash(fn.metaInfo.InfoHash)
-       torrentOff := fn.TorrentOffset + req.Offset
-       fn.FS.Client.PrioritizeDataRegion(infoHash, torrentOff, int64(len(data)))
-       for {
-               n, err := fn.FS.Client.TorrentReadAt(infoHash, torrentOff, data)
-               // log.Println(torrentOff, len(data), n, err)
-               switch err {
-               case nil:
-                       resp.Data = data[:n]
-                       return nil
-               case torrent.ErrDataNotReady:
-                       select {
-                       case <-dataSpecs:
-                       case <-intr:
-                               return fuse.EINTR
-                       }
-               default:
-                       log.Print(err)
-                       return fuse.EIO
-               }
-       }
-}
-
-type dirNode struct {
-       node
-}
-
-var (
-       _ fusefs.HandleReadDirer = dirNode{}
-
-       _ fusefs.HandleReader = fileNode{}
-)
-
-func isSubPath(parent, child []string) bool {
-       if len(child) <= len(parent) {
-               return false
-       }
-       for i := range parent {
-               if parent[i] != child[i] {
-                       return false
-               }
-       }
-       return true
-}
-
-func (dn dirNode) ReadDir(intr fusefs.Intr) (des []fuse.Dirent, err fuse.Error) {
-       names := map[string]bool{}
-       for _, fi := range dn.metaInfo.Files {
-               if !isSubPath(dn.path, fi.Path) {
-                       continue
-               }
-               name := fi.Path[len(dn.path)]
-               if names[name] {
-                       continue
-               }
-               names[name] = true
-               de := fuse.Dirent{
-                       Name: name,
-               }
-               if len(fi.Path) == len(dn.path)+1 {
-                       de.Type = fuse.DT_File
-               } else {
-                       de.Type = fuse.DT_Dir
-               }
-               des = append(des, de)
-       }
-       return
-}
-
-func (dn dirNode) Lookup(name string, intr fusefs.Intr) (_node fusefs.Node, err fuse.Error) {
-       var torrentOffset int64
-       for _, fi := range dn.metaInfo.Files {
-               if !isSubPath(dn.path, fi.Path) {
-                       torrentOffset += fi.Length
-                       continue
-               }
-               if fi.Path[len(dn.path)] != name {
-                       torrentOffset += fi.Length
-                       continue
-               }
-               __node := dn.node
-               __node.path = append(__node.path, name)
-               if len(fi.Path) == len(dn.path)+1 {
-                       _node = fileNode{
-                               node:          __node,
-                               size:          uint64(fi.Length),
-                               TorrentOffset: torrentOffset,
-                       }
-               } else {
-                       _node = dirNode{__node}
-               }
-               break
-       }
-       if _node == nil {
-               err = fuse.ENOENT
-       }
-       return
-}
-
-func (dn dirNode) Attr() (attr fuse.Attr) {
-       attr.Mode = os.ModeDir | defaultMode
-       return
-}
-
-func isSingleFileTorrent(mi *metainfo.MetaInfo) bool {
-       return len(mi.Files) == 1 && mi.Files[0].Path == nil
-}
-
-func (me rootNode) Lookup(name string, intr fusefs.Intr) (_node fusefs.Node, err fuse.Error) {
-       for _, _torrent := range me.fs.Client.Torrents() {
-               metaInfo := _torrent.MetaInfo
-               if metaInfo.Name == name {
-                       __node := node{
-                               metaInfo: metaInfo,
-                               FS:       me.fs,
-                               InfoHash: torrent.BytesInfoHash(metaInfo.InfoHash),
-                       }
-                       if isSingleFileTorrent(metaInfo) {
-                               _node = fileNode{__node, uint64(metaInfo.Files[0].Length), 0}
-                       } else {
-                               _node = dirNode{__node}
-                       }
-                       break
-               }
-       }
-       if _node == nil {
-               err = fuse.ENOENT
-       }
-       return
-}
-
-func (me rootNode) ReadDir(intr fusefs.Intr) (dirents []fuse.Dirent, err fuse.Error) {
-       for _, _torrent := range me.fs.Client.Torrents() {
-               metaInfo := _torrent.MetaInfo
-               dirents = append(dirents, fuse.Dirent{
-                       Name: metaInfo.Name,
-                       Type: func() fuse.DirentType {
-                               if isSingleFileTorrent(metaInfo) {
-                                       return fuse.DT_File
-                               } else {
-                                       return fuse.DT_Dir
-                               }
-                       }(),
-               })
-       }
-       return
-}
-
-func (rootNode) Attr() fuse.Attr {
-       return fuse.Attr{
-               Mode: os.ModeDir,
-       }
-}
-
-func (tfs *TorrentFS) Root() (fusefs.Node, fuse.Error) {
-       return rootNode{tfs}, nil
-}
-
 func main() {
        pprofAddr := flag.String("pprofAddr", "", "pprof HTTP server bind address")
        testPeer := flag.String("testPeer", "", "the address for a test peer")
@@ -279,10 +43,8 @@ func main() {
        if *pprofAddr != "" {
                go http.ListenAndServe(*pprofAddr, nil)
        }
-       // defer profile.Start(profile.CPUProfile).Stop()
        client := &torrent.Client{
                DataDir:       downloadDir,
-               DataReady:     make(chan torrent.DataSpec),
                HalfOpenLimit: 2,
        }
        client.Start()
diff --git a/cmd/torrentfs/main_test.go b/cmd/torrentfs/main_test.go
deleted file mode 100644 (file)
index e4ea512..0000000
+++ /dev/null
@@ -1,28 +0,0 @@
-package main
-
-import (
-       "net"
-       "testing"
-)
-
-func TestTCPAddrString(t *testing.T) {
-       ta := &net.TCPAddr{
-               IP:   net.IPv4(127, 0, 0, 1),
-               Port: 3000,
-       }
-       s := ta.String()
-       l, err := net.Listen("tcp4", "localhost:3000")
-       if err != nil {
-               t.Fatal(err)
-       }
-       defer l.Close()
-       c, err := net.Dial("tcp", l.Addr().String())
-       if err != nil {
-               t.Fatal(err)
-       }
-       defer c.Close()
-       ras := c.RemoteAddr().String()
-       if ras != s {
-               t.FailNow()
-       }
-}
diff --git a/fs/torrentfs.go b/fs/torrentfs.go
new file mode 100644 (file)
index 0000000..b6fcbe5
--- /dev/null
@@ -0,0 +1,260 @@
+package torrentfs
+
+import (
+       "bazil.org/fuse"
+       fusefs "bazil.org/fuse/fs"
+       "bitbucket.org/anacrolix/go.torrent"
+       metainfo "github.com/nsf/libtorgo/torrent"
+       "os"
+       "sync"
+)
+
+const (
+       defaultMode = 0555
+)
+
+type torrentFS struct {
+       Client   *torrent.Client
+       DataSubs map[chan torrent.DataSpec]struct{}
+       sync.Mutex
+}
+
+func (tfs *torrentFS) publishData() {
+       for {
+               spec := <-tfs.Client.DataReady
+               tfs.Lock()
+               for ds := range tfs.DataSubs {
+                       ds <- spec
+               }
+               tfs.Unlock()
+       }
+}
+
+func (tfs *torrentFS) SubscribeData() chan torrent.DataSpec {
+       ch := make(chan torrent.DataSpec)
+       tfs.Lock()
+       tfs.DataSubs[ch] = struct{}{}
+       tfs.Unlock()
+       return ch
+}
+
+func (tfs *torrentFS) UnsubscribeData(ch chan torrent.DataSpec) {
+       go func() {
+               for _ = range ch {
+               }
+       }()
+       tfs.Lock()
+       delete(tfs.DataSubs, ch)
+       tfs.Unlock()
+       close(ch)
+}
+
+type rootNode struct {
+       fs *torrentFS
+}
+
+type node struct {
+       path     []string
+       metaInfo *metainfo.MetaInfo
+       FS       *torrentFS
+       InfoHash torrent.InfoHash
+}
+
+type fileNode struct {
+       node
+       size          uint64
+       TorrentOffset int64
+}
+
+func (fn fileNode) Attr() (attr fuse.Attr) {
+       attr.Size = fn.size
+       attr.Mode = defaultMode
+       return
+}
+
+func (fn fileNode) Read(req *fuse.ReadRequest, resp *fuse.ReadResponse, intr fusefs.Intr) fuse.Error {
+       if req.Dir {
+               panic("hodor")
+       }
+       dataSpecs := fn.FS.SubscribeData()
+       defer fn.FS.UnsubscribeData(dataSpecs)
+       data := make([]byte, func() int {
+               _len := int64(fn.size) - req.Offset
+               if int64(req.Size) < _len {
+                       return req.Size
+               } else {
+                       // limit read to the end of the file
+                       return int(_len)
+               }
+       }())
+       if len(data) == 0 {
+               return nil
+       }
+       infoHash := torrent.BytesInfoHash(fn.metaInfo.InfoHash)
+       torrentOff := fn.TorrentOffset + req.Offset
+       fn.FS.Client.PrioritizeDataRegion(infoHash, torrentOff, int64(len(data)))
+       for {
+               n, err := fn.FS.Client.TorrentReadAt(infoHash, torrentOff, data)
+               switch err {
+               case nil:
+                       resp.Data = data[:n]
+                       return nil
+               case torrent.ErrDataNotReady:
+                       select {
+                       case <-dataSpecs:
+                       case <-intr:
+                               return fuse.EINTR
+                       }
+               default:
+                       return fuse.EIO
+               }
+       }
+}
+
+type dirNode struct {
+       node
+}
+
+var (
+       _ fusefs.HandleReadDirer = dirNode{}
+
+       _ fusefs.HandleReader = fileNode{}
+)
+
+func isSubPath(parent, child []string) bool {
+       if len(child) <= len(parent) {
+               return false
+       }
+       for i := range parent {
+               if parent[i] != child[i] {
+                       return false
+               }
+       }
+       return true
+}
+
+func (dn dirNode) ReadDir(intr fusefs.Intr) (des []fuse.Dirent, err fuse.Error) {
+       names := map[string]bool{}
+       for _, fi := range dn.metaInfo.Files {
+               if !isSubPath(dn.path, fi.Path) {
+                       continue
+               }
+               name := fi.Path[len(dn.path)]
+               if names[name] {
+                       continue
+               }
+               names[name] = true
+               de := fuse.Dirent{
+                       Name: name,
+               }
+               if len(fi.Path) == len(dn.path)+1 {
+                       de.Type = fuse.DT_File
+               } else {
+                       de.Type = fuse.DT_Dir
+               }
+               des = append(des, de)
+       }
+       return
+}
+
+func (dn dirNode) Lookup(name string, intr fusefs.Intr) (_node fusefs.Node, err fuse.Error) {
+       var torrentOffset int64
+       for _, fi := range dn.metaInfo.Files {
+               if !isSubPath(dn.path, fi.Path) {
+                       torrentOffset += fi.Length
+                       continue
+               }
+               if fi.Path[len(dn.path)] != name {
+                       torrentOffset += fi.Length
+                       continue
+               }
+               __node := dn.node
+               __node.path = append(__node.path, name)
+               if len(fi.Path) == len(dn.path)+1 {
+                       _node = fileNode{
+                               node:          __node,
+                               size:          uint64(fi.Length),
+                               TorrentOffset: torrentOffset,
+                       }
+               } else {
+                       _node = dirNode{__node}
+               }
+               break
+       }
+       if _node == nil {
+               err = fuse.ENOENT
+       }
+       return
+}
+
+func (dn dirNode) Attr() (attr fuse.Attr) {
+       attr.Mode = os.ModeDir | defaultMode
+       return
+}
+
+func isSingleFileTorrent(mi *metainfo.MetaInfo) bool {
+       return len(mi.Files) == 1 && mi.Files[0].Path == nil
+}
+
+func (me rootNode) Lookup(name string, intr fusefs.Intr) (_node fusefs.Node, err fuse.Error) {
+       for _, _torrent := range me.fs.Client.Torrents() {
+               metaInfo := _torrent.MetaInfo
+               if metaInfo.Name == name {
+                       __node := node{
+                               metaInfo: metaInfo,
+                               FS:       me.fs,
+                               InfoHash: torrent.BytesInfoHash(metaInfo.InfoHash),
+                       }
+                       if isSingleFileTorrent(metaInfo) {
+                               _node = fileNode{__node, uint64(metaInfo.Files[0].Length), 0}
+                       } else {
+                               _node = dirNode{__node}
+                       }
+                       break
+               }
+       }
+       if _node == nil {
+               err = fuse.ENOENT
+       }
+       return
+}
+
+func (me rootNode) ReadDir(intr fusefs.Intr) (dirents []fuse.Dirent, err fuse.Error) {
+       for _, _torrent := range me.fs.Client.Torrents() {
+               metaInfo := _torrent.MetaInfo
+               dirents = append(dirents, fuse.Dirent{
+                       Name: metaInfo.Name,
+                       Type: func() fuse.DirentType {
+                               if isSingleFileTorrent(metaInfo) {
+                                       return fuse.DT_File
+                               } else {
+                                       return fuse.DT_Dir
+                               }
+                       }(),
+               })
+       }
+       return
+}
+
+func (rootNode) Attr() fuse.Attr {
+       return fuse.Attr{
+               Mode: os.ModeDir,
+       }
+}
+
+func (tfs *torrentFS) Root() (fusefs.Node, fuse.Error) {
+       return rootNode{tfs}, nil
+}
+
+func MountAndServe(dir string, cl *torrent.Client) error {
+       conn, err := fuse.Mount(dir)
+       if err != nil {
+               return err
+       }
+       fs := &torrentFS{
+               Client:   cl,
+               DataSubs: make(map[chan torrent.DataSpec]struct{}),
+       }
+       go fs.publishData()
+       return fusefs.Serve(conn, fs)
+}
diff --git a/fs/torrentfs_test.go b/fs/torrentfs_test.go
new file mode 100644 (file)
index 0000000..962d6e2
--- /dev/null
@@ -0,0 +1,103 @@
+package torrentfs
+
+import (
+       "bitbucket.org/anacrolix/go.torrent"
+       "bytes"
+       metainfo "github.com/nsf/libtorgo/torrent"
+       "io"
+       "io/ioutil"
+       "net"
+       "os"
+       "path/filepath"
+       "testing"
+)
+
+func TestTCPAddrString(t *testing.T) {
+       ta := &net.TCPAddr{
+               IP:   net.IPv4(127, 0, 0, 1),
+               Port: 3000,
+       }
+       s := ta.String()
+       l, err := net.Listen("tcp4", "localhost:3000")
+       if err != nil {
+               t.Fatal(err)
+       }
+       defer l.Close()
+       c, err := net.Dial("tcp", l.Addr().String())
+       if err != nil {
+               t.Fatal(err)
+       }
+       defer c.Close()
+       ras := c.RemoteAddr().String()
+       if ras != s {
+               t.FailNow()
+       }
+}
+
+func createDummyTorrentData(dirName string) string {
+       f, _ := os.Create(filepath.Join(dirName, "greeting"))
+       f.WriteString("hello, world\n")
+       return f.Name()
+}
+
+func createMetaInfo(name string, w io.Writer) {
+       builder := metainfo.Builder{}
+       builder.AddFile(name)
+       builder.AddAnnounceGroup([]string{"lol://cheezburger"})
+       batch, err := builder.Submit()
+       if err != nil {
+               panic(err)
+       }
+       errs, _ := batch.Start(w, 1)
+       <-errs
+}
+
+func TestDownloadOnDemand(t *testing.T) {
+       dir, err := ioutil.TempDir("", "torrentfs")
+       if err != nil {
+               t.Fatal(err)
+       }
+       defer func() {
+               if err := os.RemoveAll(dir); err != nil {
+                       t.Error(err)
+               }
+       }()
+       t.Logf("test directory: %s", dir)
+       finishedDir := filepath.Join(dir, "finished")
+       os.Mkdir(finishedDir, 0777)
+       name := createDummyTorrentData(finishedDir)
+       metaInfoBuf := &bytes.Buffer{}
+       createMetaInfo(name, metaInfoBuf)
+       metaInfo, err := metainfo.Load(metaInfoBuf)
+       seeder := torrent.Client{
+               DataDir: finishedDir,
+               Listener: func() net.Listener {
+                       conn, err := net.Listen("tcp", ":0")
+                       if err != nil {
+                               panic(err)
+                       }
+                       return conn
+               }(),
+       }
+       seeder.Start()
+       seeder.AddTorrent(metaInfo)
+       leecher := torrent.Client{
+               DataDir:   filepath.Join(dir, "download"),
+               DataReady: make(chan torrent.DataSpec),
+       }
+       leecher.Start()
+       leecher.AddTorrent(metaInfo)
+       leecher.AddPeers(torrent.BytesInfoHash(metaInfo.InfoHash), []torrent.Peer{func() torrent.Peer {
+               tcpAddr := seeder.Listener.Addr().(*net.TCPAddr)
+               return torrent.Peer{
+                       IP:   tcpAddr.IP,
+                       Port: tcpAddr.Port,
+               }
+       }()})
+       mountDir := filepath.Join(dir, "mnt")
+       os.Mkdir(mountDir, 0777)
+       err = MountAndServe(mountDir, &leecher)
+       if err != nil {
+               t.Fatal(err)
+       }
+}
index 2f9547ef29dcf439686da9e610cbfe5b3280c52b..281d84869a9666929ccedc6aa91081bc256dd5ae 100644 (file)
@@ -24,15 +24,15 @@ const (
 )
 
 const (
-       Choke MessageType = iota
-       Unchoke
-       Interested
-       NotInterested
-       Have
-       Bitfield
-       Request
-       Piece
-       Cancel
+       Choke         MessageType = iota
+       Unchoke                   // 1
+       Interested                // 2
+       NotInterested             // 3
+       Have                      // 4
+       Bitfield                  // 5
+       Request                   // 6
+       Piece                     // 7
+       Cancel                    // 8
 )
 
 type Message struct {
@@ -66,8 +66,22 @@ func (msg Message) MarshalBinary() (data []byte, err error) {
                }
        case Bitfield:
                _, err = buf.Write(marshalBitfield(msg.Bitfield))
+       case Piece:
+               for _, i := range []Integer{msg.Index, msg.Begin} {
+                       err = binary.Write(buf, binary.BigEndian, i)
+                       if err != nil {
+                               return
+                       }
+               }
+               n, err := buf.Write(msg.Piece)
+               if err != nil {
+                       break
+               }
+               if n != len(msg.Piece) {
+                       panic(n)
+               }
        default:
-               err = errors.New("unknown message type")
+               err = fmt.Errorf("unknown message type: %s", msg.Type)
        }
        data = make([]byte, 4+buf.Len())
        binary.BigEndian.PutUint32(data, uint32(buf.Len()))
@@ -114,7 +128,12 @@ func (d *Decoder) Decode(msg *Message) (err error) {
        case Have:
                err = msg.Index.Read(r)
        case Request, Cancel:
-               err = binary.Read(r, binary.BigEndian, []*Integer{&msg.Index, &msg.Begin, &msg.Length})
+               for _, data := range []*Integer{&msg.Index, &msg.Begin, &msg.Length} {
+                       err = data.Read(r)
+                       if err != nil {
+                               break
+                       }
+               }
        case Bitfield:
                b := make([]byte, length-1)
                _, err = io.ReadFull(r, b)
index caebd8eff74ad65eb5bbe7f9c4d0cd4004869b5a..e9e453521d351558e5916154666c0b070d1d0191 100644 (file)
@@ -1,9 +1,27 @@
 package peer_protocol
 
 import (
+       "bytes"
        "testing"
 )
 
+func TestBinaryReadSliceOfPointers(t *testing.T) {
+       var msg Message
+       r := bytes.NewBufferString("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00@\x00")
+       if r.Len() != 12 {
+               t.Fatalf("expected 12 bytes left, but there %d", r.Len())
+       }
+       for _, data := range []*Integer{&msg.Index, &msg.Begin, &msg.Length} {
+               err := data.Read(r)
+               if err != nil {
+                       t.Fatal(err)
+               }
+       }
+       if r.Len() != 0 {
+               t.FailNow()
+       }
+}
+
 func TestConstants(t *testing.T) {
        // check that iota works as expected in the const block
        if NotInterested != 3 {