]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Various progress, particularly around the way data readiness is handled
authorMatt Joiner <anacrolix@gmail.com>
Wed, 19 Mar 2014 17:30:08 +0000 (04:30 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Wed, 19 Mar 2014 17:30:08 +0000 (04:30 +1100)
client.go
cmd/torrentfs/main.go
fs/torrentfs.go
fs/torrentfs_test.go

index 077a5ac1fc3cc14b5a4fe39af62c62bbed44a370..8f163780412581a94e3a7ae4717ad08f6d61a3c2 100644 (file)
--- a/client.go
+++ b/client.go
@@ -52,11 +52,12 @@ type piece struct {
        Hash              pieceSum
        PendingChunkSpecs map[ChunkSpec]struct{}
        Hashing           bool
+       QueuedForHash     bool
        EverHashed        bool
 }
 
 func (p *piece) Complete() bool {
-       return len(p.PendingChunkSpecs) == 0 && !p.Hashing && p.EverHashed
+       return len(p.PendingChunkSpecs) == 0 && p.EverHashed
 }
 
 func lastChunkSpec(pieceLength peer_protocol.Integer) (cs ChunkSpec) {
@@ -126,7 +127,11 @@ func (c *Connection) Post(msg encoding.BinaryMarshaler) {
        c.post <- msg
 }
 
+// Returns true if more requests can be sent.
 func (c *Connection) Request(chunk Request) bool {
+       if !c.PeerPieces[chunk.Index] {
+               panic("peer doesn't have that piece!")
+       }
        if len(c.Requests) >= maxRequests {
                return false
        }
@@ -291,20 +296,19 @@ func (t *Torrent) piecesByPendingBytesDesc() (indices []peer_protocol.Integer) {
 // Currently doesn't really queue, but should in the future.
 func (cl *Client) queuePieceCheck(t *Torrent, pieceIndex peer_protocol.Integer) {
        piece := t.Pieces[pieceIndex]
-       if piece.Hashing {
+       if piece.QueuedForHash {
                return
        }
-       piece.Hashing = true
+       piece.QueuedForHash = true
        go cl.verifyPiece(t, pieceIndex)
 }
 
 func (cl *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) {
-       log.Print(len_)
        cl.mu.Lock()
        defer cl.mu.Unlock()
        t := cl.torrent(ih)
        newPriorities := make([]Request, 0, (len_+2*(chunkSize-1))/chunkSize)
-       for len_ != 0 {
+       for len_ > 0 {
                // TODO: Write a function to return the Request for a given offset.
                index := peer_protocol.Integer(off / t.MetaInfo.PieceLength)
                pieceOff := peer_protocol.Integer(off % t.MetaInfo.PieceLength)
@@ -313,8 +317,8 @@ func (cl *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) {
                        cl.queuePieceCheck(t, index)
                }
                chunk := ChunkSpec{pieceOff / chunkSize * chunkSize, chunkSize}
-               if int64(chunk.Length) > len_ {
-                       chunk.Length = peer_protocol.Integer(len_)
+               if chunk.Begin+chunk.Length > t.PieceLength(index) {
+                       chunk.Length = t.PieceLength(index) - chunk.Begin
                }
                adv := int64(chunk.Length - pieceOff%chunkSize)
                off += adv
@@ -324,7 +328,7 @@ func (cl *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) {
                }
                newPriorities = append(newPriorities, Request{index, chunk})
        }
-       if len(newPriorities) < 1 {
+       if len(newPriorities) == 0 {
                return
        }
        log.Print(newPriorities)
@@ -347,7 +351,7 @@ func (t *Torrent) WriteChunk(piece int, begin int64, data []byte) (err error) {
 
 func (t *Torrent) bitfield() (bf []bool) {
        for _, p := range t.Pieces {
-               bf = append(bf, p.EverHashed && !p.Hashing && len(p.PendingChunkSpecs) == 0)
+               bf = append(bf, p.EverHashed && len(p.PendingChunkSpecs) == 0)
        }
        return
 }
@@ -413,19 +417,20 @@ type DataSpec struct {
 }
 
 type Client struct {
-       DataDir       string
-       HalfOpenLimit int
-       PeerId        [20]byte
-       DataReady     chan DataSpec
-       Listener      net.Listener
+       DataDir         string
+       HalfOpenLimit   int
+       PeerId          [20]byte
+       Listener        net.Listener
+       DisableTrackers bool
 
        sync.Mutex
        mu    *sync.Mutex
        event sync.Cond
        quit  chan struct{}
 
-       halfOpen int
-       torrents map[InfoHash]*Torrent
+       halfOpen   int
+       torrents   map[InfoHash]*Torrent
+       dataWaiter chan struct{}
 }
 
 var (
@@ -711,7 +716,7 @@ func (me *Client) peerGotPiece(torrent *Torrent, conn *Connection, piece int) {
 
 func (t *Torrent) wantPiece(index int) bool {
        p := t.Pieces[index]
-       return p.EverHashed && !p.Hashing && len(p.PendingChunkSpecs) != 0
+       return p.EverHashed && len(p.PendingChunkSpecs) != 0
 }
 
 func (me *Client) peerUnchoked(torrent *Torrent, conn *Connection) {
@@ -731,7 +736,6 @@ func (me *Client) connectionLoop(torrent *Torrent, conn *Connection) error {
                if err != nil {
                        return err
                }
-               log.Print(msg.Type)
                if msg.Keepalive {
                        continue
                }
@@ -803,6 +807,7 @@ func (me *Client) connectionLoop(torrent *Torrent, conn *Connection) error {
                if err != nil {
                        return err
                }
+               log.Print("replenishing from loop")
                me.replenishConnRequests(torrent, conn)
        }
 }
@@ -915,7 +920,9 @@ func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) error {
                return torrent.Close()
        }
        me.torrents[torrent.InfoHash] = torrent
-       go me.announceTorrent(torrent)
+       if !me.DisableTrackers {
+               go me.announceTorrent(torrent)
+       }
        for i := range torrent.Pieces {
                me.queuePieceCheck(torrent, peer_protocol.Integer(i))
        }
@@ -985,23 +992,32 @@ func (me *Client) replenishConnRequests(torrent *Torrent, conn *Connection) {
        addRequest := func(req Request) (again bool) {
                piece := torrent.Pieces[req.Index]
                if piece.Hashing {
+                       // We can't be sure we want this.
+                       log.Print("piece is hashing")
                        return true
                }
                if piece.Complete() {
+                       log.Print("piece is complete")
+                       // We already have this.
                        return true
                }
                if requestHeatMap[req] > 0 {
+                       log.Print("piece is hot")
+                       // We've already requested this.
                        return true
                }
                return conn.Request(req)
        }
+       // First request prioritized chunks.
        if torrent.Priorities != nil {
                for e := torrent.Priorities.Front(); e != nil; e = e.Next() {
+                       log.Print(e.Value.(Request))
                        if !addRequest(e.Value.(Request)) {
                                return
                        }
                }
        }
+       // Then finish of incomplete pieces in order of bytes remaining.
        for _, index := range torrent.piecesByPendingBytesDesc() {
                if torrent.PieceNumPendingBytes(index) == torrent.PieceLength(index) {
                        continue
@@ -1042,22 +1058,26 @@ func (me *Client) downloadedChunk(torrent *Torrent, msg *peer_protocol.Message)
 }
 
 func (cl *Client) dataReady(ds DataSpec) {
-       if cl.DataReady == nil {
-               return
+       if cl.dataWaiter != nil {
+               close(cl.dataWaiter)
        }
-       go func() {
-               cl.DataReady <- ds
-       }()
+       cl.dataWaiter = nil
+}
+
+func (cl *Client) DataWaiter() <-chan struct{} {
+       cl.Lock()
+       defer cl.Unlock()
+       if cl.dataWaiter == nil {
+               cl.dataWaiter = make(chan struct{})
+       }
+       return cl.dataWaiter
 }
 
 func (me *Client) pieceHashed(t *Torrent, piece peer_protocol.Integer, correct bool) {
        p := t.Pieces[piece]
-       if !p.Hashing {
-               panic("invalid state")
-       }
-       p.Hashing = false
        p.EverHashed = true
        if correct {
+               log.Print("piece passed hash")
                p.PendingChunkSpecs = nil
                var next *list.Element
                if t.Priorities != nil {
@@ -1076,6 +1096,7 @@ func (me *Client) pieceHashed(t *Torrent, piece peer_protocol.Integer, correct b
                        },
                })
        } else {
+               log.Print("piece failed hash")
                if len(p.PendingChunkSpecs) == 0 {
                        p.PendingChunkSpecs = t.pieceChunkSpecs(piece)
                }
@@ -1096,11 +1117,18 @@ func (me *Client) pieceHashed(t *Torrent, piece peer_protocol.Integer, correct b
 }
 
 func (cl *Client) verifyPiece(t *Torrent, index peer_protocol.Integer) {
+       cl.mu.Lock()
+       p := t.Pieces[index]
+       for p.Hashing {
+               cl.event.Wait()
+       }
+       p.Hashing = true
+       p.QueuedForHash = false
+       cl.mu.Unlock()
        sum := t.HashPiece(index)
        cl.mu.Lock()
-       piece := t.Pieces[index]
-       cl.pieceHashed(t, index, sum == piece.Hash)
-       piece.Hashing = false
+       p.Hashing = false
+       cl.pieceHashed(t, index, sum == p.Hash)
        cl.mu.Unlock()
 }
 
index f13774c1626bef3515e9bb13334b7fda34009f30..7a13a3fe8d0c3274a6ce7bff8527a0a64db78e2d 100644 (file)
@@ -4,6 +4,7 @@ import (
        "bazil.org/fuse"
        fusefs "bazil.org/fuse/fs"
        "bitbucket.org/anacrolix/go.torrent"
+       "bitbucket.org/anacrolix/go.torrent/fs"
        "flag"
        metainfo "github.com/nsf/libtorgo/torrent"
        "log"
@@ -11,16 +12,21 @@ import (
        "net/http"
        _ "net/http/pprof"
        "os"
+       "os/signal"
        "os/user"
        "path/filepath"
-       "sync"
+       "syscall"
        "time"
 )
 
 var (
-       downloadDir string
-       torrentPath string
-       mountDir    string
+       downloadDir     string
+       torrentPath     string
+       mountDir        string
+       disableTrackers = flag.Bool("disableTrackers", false, "disables trackers")
+       testPeer        = flag.String("testPeer", "", "the address for a test peer")
+       pprofAddr       = flag.String("pprofAddr", "", "pprof HTTP server bind address")
+       testPeerAddr    *net.TCPAddr
 )
 
 func init() {
@@ -35,17 +41,51 @@ func init() {
        flag.StringVar(&mountDir, "mountDir", "", "location the torrent contents are made available")
 }
 
+func resolveTestPeerAddr() {
+       if *testPeer == "" {
+               return
+       }
+       var err error
+       testPeerAddr, err = net.ResolveTCPAddr("tcp4", *testPeer)
+       if err != nil {
+               log.Fatal(err)
+       }
+}
+
+func setSignalHandlers() {
+       c := make(chan os.Signal)
+       signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
+       go func() {
+               <-c
+               fuse.Unmount(mountDir)
+       }()
+}
+
 func main() {
-       pprofAddr := flag.String("pprofAddr", "", "pprof HTTP server bind address")
-       testPeer := flag.String("testPeer", "", "the address for a test peer")
        flag.Parse()
+       if flag.NArg() != 0 {
+               os.Stderr.WriteString("one does not simply pass positional args\n")
+               os.Exit(2)
+       }
+       if mountDir == "" {
+               os.Stderr.WriteString("y u no specify mountpoint?\n")
+               os.Exit(2)
+       }
        log.SetFlags(log.LstdFlags | log.Lshortfile)
        if *pprofAddr != "" {
                go http.ListenAndServe(*pprofAddr, nil)
        }
+       conn, err := fuse.Mount(mountDir)
+       if err != nil {
+               log.Fatal(err)
+       }
+       defer fuse.Unmount(mountDir)
+       // TODO: Think about the ramifications of exiting not due to a signal.
+       setSignalHandlers()
+       defer conn.Close()
        client := &torrent.Client{
-               DataDir:       downloadDir,
-               HalfOpenLimit: 2,
+               DataDir:         downloadDir,
+               DisableTrackers: *disableTrackers,
        }
        client.Start()
        torrentDir, err := os.Open(torrentPath)
@@ -57,13 +97,7 @@ func main() {
        if err != nil {
                log.Fatal(err)
        }
-       var testAddr *net.TCPAddr
-       if *testPeer != "" {
-               testAddr, err = net.ResolveTCPAddr("tcp4", *testPeer)
-               if err != nil {
-                       log.Fatal(err)
-               }
-       }
+       resolveTestPeerAddr()
        for _, name := range names {
                metaInfo, err := metainfo.LoadFromFile(filepath.Join(torrentPath, name))
                if err != nil {
@@ -74,31 +108,23 @@ func main() {
                        log.Print(err)
                }
        }
-       conn, err := fuse.Mount(mountDir)
-       if err != nil {
-               log.Fatal(err)
-       }
-       fs := &TorrentFS{
-               Client:   client,
-               DataSubs: make(map[chan torrent.DataSpec]struct{}),
-       }
-       go fs.publishData()
+       fs := torrentfs.New(client)
        go func() {
                for {
                torrentLoop:
                        for _, t := range client.Torrents() {
                                client.Lock()
                                for _, c := range t.Conns {
-                                       if c.Socket.RemoteAddr().String() == testAddr.String() {
+                                       if c.Socket.RemoteAddr().String() == testPeerAddr.String() {
                                                client.Unlock()
                                                continue torrentLoop
                                        }
                                }
                                client.Unlock()
-                               if testAddr != nil {
+                               if testPeerAddr != nil {
                                        if err := client.AddPeers(t.InfoHash, []torrent.Peer{{
-                                               IP:   testAddr.IP,
-                                               Port: testAddr.Port,
+                                               IP:   testPeerAddr.IP,
+                                               Port: testPeerAddr.Port,
                                        }}); err != nil {
                                                log.Print(err)
                                        }
@@ -107,5 +133,7 @@ func main() {
                        time.Sleep(10 * time.Second)
                }
        }()
-       fusefs.Serve(conn, fs)
+       if err := fusefs.Serve(conn, fs); err != nil {
+               log.Fatal(err)
+       }
 }
index 0d8ae09e0f1415a30d8d158e80beeb63413e29f9..44710e8a598e4a6ad348bf00942a0c7a35bba7b2 100644 (file)
@@ -5,6 +5,7 @@ import (
        fusefs "bazil.org/fuse/fs"
        "bitbucket.org/anacrolix/go.torrent"
        metainfo "github.com/nsf/libtorgo/torrent"
+       "log"
        "os"
        "sync"
 )
@@ -19,36 +20,6 @@ type torrentFS struct {
        sync.Mutex
 }
 
-func (tfs *torrentFS) publishData() {
-       for {
-               spec := <-tfs.Client.DataReady
-               tfs.Lock()
-               for ds := range tfs.DataSubs {
-                       ds <- spec
-               }
-               tfs.Unlock()
-       }
-}
-
-func (tfs *torrentFS) SubscribeData() chan torrent.DataSpec {
-       ch := make(chan torrent.DataSpec)
-       tfs.Lock()
-       tfs.DataSubs[ch] = struct{}{}
-       tfs.Unlock()
-       return ch
-}
-
-func (tfs *torrentFS) UnsubscribeData(ch chan torrent.DataSpec) {
-       go func() {
-               for _ = range ch {
-               }
-       }()
-       tfs.Lock()
-       delete(tfs.DataSubs, ch)
-       tfs.Unlock()
-       close(ch)
-}
-
 var _ fusefs.NodeForgetter = rootNode{}
 
 type rootNode struct {
@@ -78,8 +49,6 @@ func (fn fileNode) Read(req *fuse.ReadRequest, resp *fuse.ReadResponse, intr fus
        if req.Dir {
                panic("hodor")
        }
-       dataSpecs := fn.FS.SubscribeData()
-       defer fn.FS.UnsubscribeData(dataSpecs)
        data := make([]byte, func() int {
                _len := int64(fn.size) - req.Offset
                if int64(req.Size) < _len {
@@ -94,8 +63,10 @@ func (fn fileNode) Read(req *fuse.ReadRequest, resp *fuse.ReadResponse, intr fus
        }
        infoHash := torrent.BytesInfoHash(fn.metaInfo.InfoHash)
        torrentOff := fn.TorrentOffset + req.Offset
+       log.Print(torrentOff, len(data), fn.TorrentOffset)
        fn.FS.Client.PrioritizeDataRegion(infoHash, torrentOff, int64(len(data)))
        for {
+               dataWaiter := fn.FS.Client.DataWaiter()
                n, err := fn.FS.Client.TorrentReadAt(infoHash, torrentOff, data)
                switch err {
                case nil:
@@ -103,11 +74,12 @@ func (fn fileNode) Read(req *fuse.ReadRequest, resp *fuse.ReadResponse, intr fus
                        return nil
                case torrent.ErrDataNotReady:
                        select {
-                       case <-dataSpecs:
+                       case <-dataWaiter:
                        case <-intr:
                                return fuse.EINTR
                        }
                default:
+                       log.Print(err)
                        return fuse.EIO
                }
        }
@@ -256,6 +228,5 @@ func New(cl *torrent.Client) *torrentFS {
                Client:   cl,
                DataSubs: make(map[chan torrent.DataSpec]struct{}),
        }
-       go fs.publishData()
        return fs
 }
index a1905c263ad3b1fffca402b14c50238ed39290ea..0c38850d7773471d07444be91d1e5798724af598 100644 (file)
@@ -96,8 +96,7 @@ func TestDownloadOnDemand(t *testing.T) {
        defer seeder.Stop()
        seeder.AddTorrent(metaInfo)
        leecher := torrent.Client{
-               DataDir:   filepath.Join(dir, "download"),
-               DataReady: make(chan torrent.DataSpec),
+               DataDir: filepath.Join(dir, "download"),
        }
        leecher.Start()
        defer leecher.Stop()
@@ -116,6 +115,11 @@ func TestDownloadOnDemand(t *testing.T) {
        if err != nil {
                t.Fatal(err)
        }
+       defer func() {
+               if err := fuse.Unmount(mountDir); err != nil {
+                       t.Fatal(err)
+               }
+       }()
        go func() {
                if err := fusefs.Serve(fuseConn, fs); err != nil {
                        t.Fatal(err)
@@ -132,9 +136,6 @@ func TestDownloadOnDemand(t *testing.T) {
        if err != nil {
                t.Fatal(err)
        }
-       if err := fuse.Unmount(mountDir); err != nil {
-               t.Fatal(err)
-       }
        if string(content) != dummyFileContents {
                t.FailNow()
        }