type Connection struct {
Socket net.Conn
+ Closed bool
post chan encoding.BinaryMarshaler
write chan []byte
}
func (c *Connection) Close() {
+ if c.Closed {
+ return
+ }
c.Socket.Close()
close(c.post)
+ c.Closed = true
}
func (c *Connection) PeerHasPiece(index peer_protocol.Integer) bool {
sync.Mutex
mu *sync.Mutex
event sync.Cond
+ quit chan struct{}
halfOpen int
torrents map[InfoHash]*Torrent
if err != nil {
panic("error generating peer id")
}
+ c.quit = make(chan struct{})
if c.Listener != nil {
go c.acceptConnections()
}
}
+func (me *Client) Stop() {
+ close(me.quit)
+ me.event.Broadcast()
+ for _, t := range me.torrents {
+ for _, c := range t.Conns {
+ c.Close()
+ }
+ }
+}
+
func (cl *Client) acceptConnections() {
for {
conn, err := cl.Listener.Accept()
+ select {
+ case <-cl.quit:
+ return
+ default:
+ }
if err != nil {
log.Print(err)
return
me.mu.Unlock()
}
-func (me *Client) Stop() {
-}
-
func (me *Client) replenishConnRequests(torrent *Torrent, conn *Connection) {
requestHeatMap := torrent.requestHeat()
addRequest := func(req Request) (again bool) {
close(ch)
}
+var _ fusefs.NodeForgetter = rootNode{}
+
type rootNode struct {
fs *torrentFS
}
}
}
+func (rootNode) Forget() {
+}
+
func (tfs *torrentFS) Root() (fusefs.Node, fuse.Error) {
return rootNode{tfs}, nil
}
-func MountAndServe(dir string, cl *torrent.Client) error {
- conn, err := fuse.Mount(dir)
- if err != nil {
- return err
- }
+func New(cl *torrent.Client) *torrentFS {
fs := &torrentFS{
Client: cl,
DataSubs: make(map[chan torrent.DataSpec]struct{}),
}
go fs.publishData()
- return fusefs.Serve(conn, fs)
+ return fs
}
package torrentfs
import (
+ "bazil.org/fuse"
+ fusefs "bazil.org/fuse/fs"
"bitbucket.org/anacrolix/go.torrent"
"bytes"
metainfo "github.com/nsf/libtorgo/torrent"
"net"
"os"
"path/filepath"
+ "runtime"
"testing"
)
}
}
+const dummyFileContents = "hello, world\n"
+
func createDummyTorrentData(dirName string) string {
f, _ := os.Create(filepath.Join(dirName, "greeting"))
f.WriteString("hello, world\n")
}
func TestDownloadOnDemand(t *testing.T) {
+ priorNumGoroutines := runtime.NumGoroutine()
+ defer func() {
+ n := runtime.NumGoroutine()
+ if n != priorNumGoroutines {
+ t.Fatalf("expected %d goroutines, but %d are running", priorNumGoroutines, n)
+ }
+ }()
dir, err := ioutil.TempDir("", "torrentfs")
if err != nil {
t.Fatal(err)
return conn
}(),
}
+ defer seeder.Listener.Close()
seeder.Start()
+ defer seeder.Stop()
seeder.AddTorrent(metaInfo)
leecher := torrent.Client{
DataDir: filepath.Join(dir, "download"),
DataReady: make(chan torrent.DataSpec),
}
leecher.Start()
+ defer leecher.Stop()
leecher.AddTorrent(metaInfo)
leecher.AddPeers(torrent.BytesInfoHash(metaInfo.InfoHash), []torrent.Peer{func() torrent.Peer {
tcpAddr := seeder.Listener.Addr().(*net.TCPAddr)
}()})
mountDir := filepath.Join(dir, "mnt")
os.Mkdir(mountDir, 0777)
- err = MountAndServe(mountDir, &leecher)
+ fs := New(&leecher)
+ fuseConn, err := fuse.Mount(mountDir)
if err != nil {
t.Fatal(err)
}
+ go func() {
+ if err := fusefs.Serve(fuseConn, fs); err != nil {
+ t.Fatal(err)
+ }
+ if err := fuseConn.Close(); err != nil {
+ t.Fatal(err)
+ }
+ }()
+ <-fuseConn.Ready
+ if fuseConn.MountError != nil {
+ t.Fatal(fuseConn.MountError)
+ }
+ content, err := ioutil.ReadFile(filepath.Join(mountDir, "greeting"))
+ if err != nil {
+ t.Fatal(err)
+ }
+ if err := fuse.Unmount(mountDir); err != nil {
+ t.Fatal(err)
+ }
+ if string(content) != dummyFileContents {
+ t.FailNow()
+ }
}