]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Torrent client publishes data ready information; torrentfs supports file reads
authorMatt Joiner <anacrolix@gmail.com>
Sun, 13 Oct 2013 12:16:21 +0000 (23:16 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Sun, 13 Oct 2013 12:16:21 +0000 (23:16 +1100)
client.go
cmd/torrentfs/main.go

index af67137c9e092f6724ca2205a76efa15f3a9115c..3e92663d4f652f224b668b0fe6f4c8a66d4dab65 100644 (file)
--- a/client.go
+++ b/client.go
@@ -53,16 +53,16 @@ const (
 type piece struct {
        State             pieceState
        Hash              pieceSum
-       PendingChunkSpecs map[chunkSpec]struct{}
+       PendingChunkSpecs map[ChunkSpec]struct{}
 }
 
-type chunkSpec struct {
+type ChunkSpec struct {
        Begin, Length peer_protocol.Integer
 }
 
-type request struct {
+type Request struct {
        Index peer_protocol.Integer
-       chunkSpec
+       ChunkSpec
 }
 
 type connection struct {
@@ -72,12 +72,12 @@ type connection struct {
 
        Interested bool
        Choked     bool
-       Requests   map[request]struct{}
+       Requests   map[Request]struct{}
 
        PeerId         [20]byte
        PeerInterested bool
        PeerChoked     bool
-       PeerRequests   map[request]struct{}
+       PeerRequests   map[Request]struct{}
        PeerExtensions [8]byte
        PeerPieces     []bool
 }
@@ -93,7 +93,7 @@ func (c *connection) Post(msg encoding.BinaryMarshaler) {
        c.post <- msg
 }
 
-func (c *connection) Request(chunk request) bool {
+func (c *connection) Request(chunk Request) bool {
        if len(c.Requests) >= maxRequests {
                return false
        }
@@ -106,7 +106,7 @@ func (c *connection) Request(chunk request) bool {
                })
        }
        if c.Requests == nil {
-               c.Requests = make(map[request]struct{}, maxRequests)
+               c.Requests = make(map[Request]struct{}, maxRequests)
        }
        c.Requests[chunk] = struct{}{}
        return true
@@ -188,12 +188,12 @@ func (t *Torrent) bitfield() (bf []bool) {
        return
 }
 
-func (t *Torrent) pieceChunkSpecs(index int) (cs map[chunkSpec]struct{}) {
-       cs = make(map[chunkSpec]struct{}, (t.MetaInfo.PieceLength+chunkSize-1)/chunkSize)
-       c := chunkSpec{
+func (t *Torrent) pieceChunkSpecs(index int) (cs map[ChunkSpec]struct{}) {
+       cs = make(map[ChunkSpec]struct{}, (t.MetaInfo.PieceLength+chunkSize-1)/chunkSize)
+       c := ChunkSpec{
                Begin: 0,
        }
-       for left := peer_protocol.Integer(t.PieceSize(index)); left > 0; left -= c.Length {
+       for left := peer_protocol.Integer(t.PieceLength(index)); left > 0; left -= c.Length {
                c.Length = left
                if c.Length > chunkSize {
                        c.Length = chunkSize
@@ -204,8 +204,8 @@ func (t *Torrent) pieceChunkSpecs(index int) (cs map[chunkSpec]struct{}) {
        return
 }
 
-func (t *Torrent) requestHeat() (ret map[request]int) {
-       ret = make(map[request]int)
+func (t *Torrent) requestHeat() (ret map[Request]int) {
+       ret = make(map[Request]int)
        for _, conn := range t.Conns {
                for req, _ := range conn.Requests {
                        ret[req]++
@@ -220,12 +220,12 @@ type Peer struct {
        Port int
 }
 
-func (t *Torrent) PieceSize(piece int) (size int64) {
+func (t *Torrent) PieceLength(piece int) (len_ peer_protocol.Integer) {
        if piece == len(t.Pieces)-1 {
-               size = t.Data.Size() % t.MetaInfo.PieceLength
+               len_ = peer_protocol.Integer(t.Data.Size() % t.MetaInfo.PieceLength)
        }
-       if size == 0 {
-               size = t.MetaInfo.PieceLength
+       if len_ == 0 {
+               len_ = peer_protocol.Integer(t.MetaInfo.PieceLength)
        }
        return
 }
@@ -236,17 +236,23 @@ func (t *Torrent) HashPiece(piece int) (ps pieceSum) {
        if err != nil {
                panic(err)
        }
-       if n != t.PieceSize(piece) {
-               panic(fmt.Sprintf("hashed wrong number of bytes: expected %d; did %d; piece %d", t.PieceSize(piece), n, piece))
+       if peer_protocol.Integer(n) != t.PieceLength(piece) {
+               panic(fmt.Sprintf("hashed wrong number of bytes: expected %d; did %d; piece %d", t.PieceLength(piece), n, piece))
        }
        copyHashSum(ps[:], hash.Sum(nil))
        return
 }
 
+type DataSpec struct {
+       InfoHash
+       Request
+}
+
 type Client struct {
        DataDir       string
        HalfOpenLimit int
        PeerId        [20]byte
+       DataReady     chan DataSpec
 
        halfOpen int
        torrents map[InfoHash]*Torrent
@@ -257,25 +263,66 @@ type Client struct {
        actorTask       chan func()
 }
 
-func NewClient(dataDir string) *Client {
-       c := &Client{
-               DataDir:       dataDir,
-               HalfOpenLimit: 10,
+var (
+       ErrDataNotReady = errors.New("data not ready")
+)
 
-               torrents: make(map[InfoHash]*Torrent),
+func (cl *Client) TorrentReadAt(ih InfoHash, off int64, p []byte) (n int, err error) {
+       done := make(chan struct{})
+       cl.withContext(func() {
+               defer func() {
+                       close(done)
+               }()
+               t := cl.torrent(ih)
+               if t == nil {
+                       err = errors.New("unknown torrent")
+                       return
+               }
+               index := int(off / int64(t.PieceLength(0)))
+               piece := t.Pieces[index]
+               pieceOff := peer_protocol.Integer(off % int64(t.PieceLength(0)))
+               switch piece.State {
+               case pieceStateComplete:
+                       high := int(t.PieceLength(index) - peer_protocol.Integer(off%int64(t.PieceLength(0))))
+                       if high < len(p) {
+                               p = p[:high]
+                       }
+               case pieceStateIncomplete:
+                       for cs, _ := range piece.PendingChunkSpecs {
+                               chunkOff := int64(pieceOff - cs.Begin)
+                               if 0 <= chunkOff && chunkOff < int64(cs.Length) {
+                                       // read begins in a pending chunk
+                                       err = ErrDataNotReady
+                                       return
+                               }
+                               // pending chunk caps available data
+                               if chunkOff < 0 && int64(len(p)) > -chunkOff {
+                                       p = p[:-chunkOff]
+                               }
+                       }
+               default:
+                       err = ErrDataNotReady
+                       return
+               }
+               n, err = t.Data.ReadAt(p, off)
+       })
+       <-done
+       log.Println(n, err)
+       return
+}
 
-               noTorrents:      make(chan struct{}),
-               addTorrent:      make(chan *Torrent),
-               torrentFinished: make(chan InfoHash),
-               actorTask:       make(chan func()),
-       }
+func (c *Client) Start() {
+       c.torrents = make(map[InfoHash]*Torrent)
+       c.noTorrents = make(chan struct{})
+       c.addTorrent = make(chan *Torrent)
+       c.torrentFinished = make(chan InfoHash)
+       c.actorTask = make(chan func())
        o := copy(c.PeerId[:], BEP20)
        _, err := rand.Read(c.PeerId[o:])
        if err != nil {
                panic("error generating peer id")
        }
        go c.run()
-       return c
 }
 
 func mmapTorrentData(metaInfo *metainfo.MetaInfo, location string) (mms MMapSpan, err error) {
@@ -482,9 +529,9 @@ func (me *Client) runConnection(torrent *Torrent, conn *connection) error {
                        case peer_protocol.Have:
                                me.peerGotPiece(torrent, conn, int(msg.Index))
                        case peer_protocol.Request:
-                               conn.PeerRequests[request{
+                               conn.PeerRequests[Request{
                                        Index:     msg.Index,
-                                       chunkSpec: chunkSpec{msg.Begin, msg.Length},
+                                       ChunkSpec: ChunkSpec{msg.Begin, msg.Length},
                                }] = struct{}{}
                        case peer_protocol.Bitfield:
                                if len(msg.Bitfield) < len(torrent.Pieces) {
@@ -502,13 +549,13 @@ func (me *Client) runConnection(torrent *Torrent, conn *connection) error {
                                        }
                                }
                        case peer_protocol.Piece:
-                               request_ := request{msg.Index, chunkSpec{msg.Begin, peer_protocol.Integer(len(msg.Piece))}}
+                               request_ := Request{msg.Index, ChunkSpec{msg.Begin, peer_protocol.Integer(len(msg.Piece))}}
                                if _, ok := conn.Requests[request_]; !ok {
                                        err = errors.New("unexpected piece")
                                        break
                                }
                                delete(conn.Requests, request_)
-                               if _, ok := torrent.Pieces[request_.Index].PendingChunkSpecs[request_.chunkSpec]; !ok {
+                               if _, ok := torrent.Pieces[request_.Index].PendingChunkSpecs[request_.ChunkSpec]; !ok {
                                        log.Printf("got unnecessary chunk: %s", request_)
                                        break
                                }
@@ -516,7 +563,8 @@ func (me *Client) runConnection(torrent *Torrent, conn *connection) error {
                                if err != nil {
                                        break
                                }
-                               delete(torrent.Pieces[request_.Index].PendingChunkSpecs, request_.chunkSpec)
+                               delete(torrent.Pieces[request_.Index].PendingChunkSpecs, request_.ChunkSpec)
+                               me.downloadedChunk(torrent, request_)
                                if len(torrent.Pieces[request_.Index].PendingChunkSpecs) == 0 {
                                        torrent.Pieces[request_.Index].State = pieceStateUnknown
                                        go me.verifyPiece(torrent, int(request_.Index))
@@ -589,6 +637,7 @@ func (me *Client) AddPeers(infoHash InfoHash, peers []Peer) (err error) {
 func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) error {
        torrent := &Torrent{
                InfoHash: BytesInfoHash(metaInfo.InfoHash),
+               MetaInfo: metaInfo,
        }
        for offset := 0; offset < len(metaInfo.Pieces); offset += PieceHash.Size() {
                hash := metaInfo.Pieces[offset : offset+PieceHash.Size()]
@@ -604,7 +653,6 @@ func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) error {
        if err != nil {
                return err
        }
-       torrent.MetaInfo = metaInfo
        me.addTorrent <- torrent
        return nil
 }
@@ -613,7 +661,7 @@ func (me *Client) WaitAll() {
        <-me.noTorrents
 }
 
-func (me *Client) Close() {
+func (me *Client) Stop() {
 }
 
 func (me *Client) withContext(f func()) {
@@ -633,7 +681,7 @@ func (me *Client) replenishConnRequests(torrent *Torrent, conn *connection) {
                        continue
                }
                for chunkSpec, _ := range torrent.Pieces[index].PendingChunkSpecs {
-                       request := request{peer_protocol.Integer(index), chunkSpec}
+                       request := Request{peer_protocol.Integer(index), chunkSpec}
                        if heat := requestHeatMap[request]; heat > 0 {
                                continue
                        }
@@ -644,7 +692,18 @@ func (me *Client) replenishConnRequests(torrent *Torrent, conn *connection) {
                }
        }
        //conn.SetInterested(false)
+}
 
+func (me *Client) downloadedChunk(t *Torrent, chunk Request) {
+}
+
+func (cl *Client) dataReady(ds DataSpec) {
+       if cl.DataReady == nil {
+               return
+       }
+       go func() {
+               cl.DataReady <- ds
+       }()
 }
 
 func (me *Client) pieceHashed(ih InfoHash, piece int, correct bool) {
@@ -661,8 +720,18 @@ func (me *Client) pieceHashed(ih InfoHash, piece int, correct bool) {
                return
        }
        torrent.Pieces[piece].State = newState
-       if newState == pieceStateIncomplete {
+       switch newState {
+       case pieceStateIncomplete:
                torrent.Pieces[piece].PendingChunkSpecs = torrent.pieceChunkSpecs(piece)
+       case pieceStateComplete:
+               log.Print(piece)
+               me.dataReady(DataSpec{
+                       torrent.InfoHash,
+                       Request{
+                               peer_protocol.Integer(piece),
+                               ChunkSpec{0, peer_protocol.Integer(torrent.PieceLength(piece))},
+                       },
+               })
        }
        for _, conn := range torrent.Conns {
                if correct {
index e567aa29e6c65029e4aae7c6f468c614ae5960d4..51cd516b9de0f260508db81770094142c3b78114 100644 (file)
@@ -5,11 +5,16 @@ import (
        fusefs "bazil.org/fuse/fs"
        "bitbucket.org/anacrolix/go.torrent"
        "flag"
+       "github.com/davecheney/profile"
        metainfo "github.com/nsf/libtorgo/torrent"
        "log"
+       "net/http"
+       _ "net/http/pprof"
        "os"
        "os/user"
        "path/filepath"
+       "sync"
+       "syscall"
 )
 
 var (
@@ -35,7 +40,40 @@ func init() {
 }
 
 type TorrentFS struct {
-       Client *torrent.Client
+       Client   *torrent.Client
+       DataSubs map[chan torrent.DataSpec]struct{}
+       sync.Mutex
+}
+
+func (tfs *TorrentFS) publishData() {
+       for {
+               spec := <-tfs.Client.DataReady
+               log.Printf("ready data: %s", spec)
+               tfs.Lock()
+               for ds := range tfs.DataSubs {
+                       ds <- spec
+               }
+               tfs.Unlock()
+       }
+}
+
+func (tfs *TorrentFS) SubscribeData() chan torrent.DataSpec {
+       ch := make(chan torrent.DataSpec)
+       tfs.Lock()
+       tfs.DataSubs[ch] = struct{}{}
+       tfs.Unlock()
+       return ch
+}
+
+func (tfs *TorrentFS) UnsubscribeData(ch chan torrent.DataSpec) {
+       go func() {
+               for _ = range ch {
+               }
+       }()
+       tfs.Lock()
+       delete(tfs.DataSubs, ch)
+       tfs.Unlock()
+       close(ch)
 }
 
 type rootNode struct {
@@ -45,12 +83,14 @@ type rootNode struct {
 type node struct {
        path     []string
        metaInfo *metainfo.MetaInfo
-       client   *torrent.Client
+       FS       *TorrentFS
+       InfoHash torrent.InfoHash
 }
 
 type fileNode struct {
        node
-       size uint64
+       size          uint64
+       TorrentOffset int64
 }
 
 func (fn fileNode) Attr() (attr fuse.Attr) {
@@ -59,10 +99,49 @@ func (fn fileNode) Attr() (attr fuse.Attr) {
        return
 }
 
+func (fn fileNode) Read(req *fuse.ReadRequest, resp *fuse.ReadResponse, intr fusefs.Intr) fuse.Error {
+       if req.Dir {
+               panic("hodor")
+       }
+       dataSpecs := fn.FS.SubscribeData()
+       defer fn.FS.UnsubscribeData(dataSpecs)
+       data := make([]byte, func() int {
+               _len := int64(fn.size) - req.Offset
+               if int64(req.Size) < _len {
+                       return req.Size
+               } else {
+                       return int(_len)
+               }
+       }())
+       for {
+               n, err := fn.FS.Client.TorrentReadAt(torrent.BytesInfoHash(fn.metaInfo.InfoHash), fn.TorrentOffset+req.Offset, data)
+               switch err {
+               case nil:
+                       resp.Data = data[:n]
+                       return nil
+               case torrent.ErrDataNotReady:
+                       select {
+                       case <-dataSpecs:
+                       case <-intr:
+                               return fuse.Errno(syscall.EINTR)
+                       }
+               default:
+                       log.Print(err)
+                       return fuse.EIO
+               }
+       }
+}
+
 type dirNode struct {
        node
 }
 
+var (
+       _ fusefs.HandleReadDirer = dirNode{}
+
+       _ fusefs.HandleReader = fileNode{}
+)
+
 func isSubPath(parent, child []string) bool {
        if len(child) <= len(parent) {
                return false
@@ -100,22 +179,23 @@ func (dn dirNode) ReadDir(intr fusefs.Intr) (des []fuse.Dirent, err fuse.Error)
 }
 
 func (dn dirNode) Lookup(name string, intr fusefs.Intr) (_node fusefs.Node, err fuse.Error) {
+       var torrentOffset int64
        for _, fi := range dn.metaInfo.Files {
                if !isSubPath(dn.path, fi.Path) {
+                       torrentOffset += fi.Length
                        continue
                }
                if fi.Path[len(dn.path)] != name {
+                       torrentOffset += fi.Length
                        continue
                }
-               __node := node{
-                       path:     append(dn.path, name),
-                       metaInfo: dn.metaInfo,
-                       client:   dn.client,
-               }
+               __node := dn.node
+               __node.path = append(__node.path, name)
                if len(fi.Path) == len(dn.path)+1 {
                        _node = fileNode{
-                               node: __node,
-                               size: uint64(fi.Length),
+                               node:          __node,
+                               size:          uint64(fi.Length),
+                               TorrentOffset: torrentOffset,
                        }
                } else {
                        _node = dirNode{__node}
@@ -143,10 +223,11 @@ func (me rootNode) Lookup(name string, intr fusefs.Intr) (_node fusefs.Node, err
                if metaInfo.Name == name {
                        __node := node{
                                metaInfo: metaInfo,
-                               client:   me.fs.Client,
+                               FS:       me.fs,
+                               InfoHash: torrent.BytesInfoHash(metaInfo.InfoHash),
                        }
                        if isSingleFileTorrent(metaInfo) {
-                               _node = fileNode{__node, uint64(metaInfo.Files[0].Length)}
+                               _node = fileNode{__node, uint64(metaInfo.Files[0].Length), 0}
                        } else {
                                _node = dirNode{__node}
                        }
@@ -187,8 +268,17 @@ func (tfs *TorrentFS) Root() (fusefs.Node, fuse.Error) {
 }
 
 func main() {
+       pprofAddr := flag.String("pprofAddr", "", "pprof HTTP server bind address")
        flag.Parse()
-       client := torrent.NewClient(downloadDir)
+       if *pprofAddr != "" {
+               go http.ListenAndServe(*pprofAddr, nil)
+       }
+       defer profile.Start(profile.CPUProfile).Stop()
+       client := &torrent.Client{
+               DataDir:   downloadDir,
+               DataReady: make(chan torrent.DataSpec),
+       }
+       client.Start()
        torrentDir, err := os.Open(torrentPath)
        defer torrentDir.Close()
        if err != nil {
@@ -212,6 +302,10 @@ func main() {
        if err != nil {
                log.Fatal(err)
        }
-       fs := &TorrentFS{client}
+       fs := &TorrentFS{
+               Client:   client,
+               DataSubs: make(map[chan torrent.DataSpec]struct{}),
+       }
+       go fs.publishData()
        fusefs.Serve(conn, fs)
 }