From 5c0ff3ff5fc6cb8893ce7a2441b2fa80ee5ea37b Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Sun, 13 Oct 2013 23:16:21 +1100 Subject: [PATCH] Torrent client publishes data ready information; torrentfs supports file reads --- client.go | 149 ++++++++++++++++++++++++++++++------------ cmd/torrentfs/main.go | 122 ++++++++++++++++++++++++++++++---- 2 files changed, 217 insertions(+), 54 deletions(-) diff --git a/client.go b/client.go index af67137c..3e92663d 100644 --- a/client.go +++ b/client.go @@ -53,16 +53,16 @@ const ( type piece struct { State pieceState Hash pieceSum - PendingChunkSpecs map[chunkSpec]struct{} + PendingChunkSpecs map[ChunkSpec]struct{} } -type chunkSpec struct { +type ChunkSpec struct { Begin, Length peer_protocol.Integer } -type request struct { +type Request struct { Index peer_protocol.Integer - chunkSpec + ChunkSpec } type connection struct { @@ -72,12 +72,12 @@ type connection struct { Interested bool Choked bool - Requests map[request]struct{} + Requests map[Request]struct{} PeerId [20]byte PeerInterested bool PeerChoked bool - PeerRequests map[request]struct{} + PeerRequests map[Request]struct{} PeerExtensions [8]byte PeerPieces []bool } @@ -93,7 +93,7 @@ func (c *connection) Post(msg encoding.BinaryMarshaler) { c.post <- msg } -func (c *connection) Request(chunk request) bool { +func (c *connection) Request(chunk Request) bool { if len(c.Requests) >= maxRequests { return false } @@ -106,7 +106,7 @@ func (c *connection) Request(chunk request) bool { }) } if c.Requests == nil { - c.Requests = make(map[request]struct{}, maxRequests) + c.Requests = make(map[Request]struct{}, maxRequests) } c.Requests[chunk] = struct{}{} return true @@ -188,12 +188,12 @@ func (t *Torrent) bitfield() (bf []bool) { return } -func (t *Torrent) pieceChunkSpecs(index int) (cs map[chunkSpec]struct{}) { - cs = make(map[chunkSpec]struct{}, (t.MetaInfo.PieceLength+chunkSize-1)/chunkSize) - c := chunkSpec{ +func (t *Torrent) pieceChunkSpecs(index int) (cs map[ChunkSpec]struct{}) { + cs = make(map[ChunkSpec]struct{}, (t.MetaInfo.PieceLength+chunkSize-1)/chunkSize) + c := ChunkSpec{ Begin: 0, } - for left := peer_protocol.Integer(t.PieceSize(index)); left > 0; left -= c.Length { + for left := peer_protocol.Integer(t.PieceLength(index)); left > 0; left -= c.Length { c.Length = left if c.Length > chunkSize { c.Length = chunkSize @@ -204,8 +204,8 @@ func (t *Torrent) pieceChunkSpecs(index int) (cs map[chunkSpec]struct{}) { return } -func (t *Torrent) requestHeat() (ret map[request]int) { - ret = make(map[request]int) +func (t *Torrent) requestHeat() (ret map[Request]int) { + ret = make(map[Request]int) for _, conn := range t.Conns { for req, _ := range conn.Requests { ret[req]++ @@ -220,12 +220,12 @@ type Peer struct { Port int } -func (t *Torrent) PieceSize(piece int) (size int64) { +func (t *Torrent) PieceLength(piece int) (len_ peer_protocol.Integer) { if piece == len(t.Pieces)-1 { - size = t.Data.Size() % t.MetaInfo.PieceLength + len_ = peer_protocol.Integer(t.Data.Size() % t.MetaInfo.PieceLength) } - if size == 0 { - size = t.MetaInfo.PieceLength + if len_ == 0 { + len_ = peer_protocol.Integer(t.MetaInfo.PieceLength) } return } @@ -236,17 +236,23 @@ func (t *Torrent) HashPiece(piece int) (ps pieceSum) { if err != nil { panic(err) } - if n != t.PieceSize(piece) { - panic(fmt.Sprintf("hashed wrong number of bytes: expected %d; did %d; piece %d", t.PieceSize(piece), n, piece)) + if peer_protocol.Integer(n) != t.PieceLength(piece) { + panic(fmt.Sprintf("hashed wrong number of bytes: expected %d; did %d; piece %d", t.PieceLength(piece), n, piece)) } copyHashSum(ps[:], hash.Sum(nil)) return } +type DataSpec struct { + InfoHash + Request +} + type Client struct { DataDir string HalfOpenLimit int PeerId [20]byte + DataReady chan DataSpec halfOpen int torrents map[InfoHash]*Torrent @@ -257,25 +263,66 @@ type Client struct { actorTask chan func() } -func NewClient(dataDir string) *Client { - c := &Client{ - DataDir: dataDir, - HalfOpenLimit: 10, +var ( + ErrDataNotReady = errors.New("data not ready") +) - torrents: make(map[InfoHash]*Torrent), +func (cl *Client) TorrentReadAt(ih InfoHash, off int64, p []byte) (n int, err error) { + done := make(chan struct{}) + cl.withContext(func() { + defer func() { + close(done) + }() + t := cl.torrent(ih) + if t == nil { + err = errors.New("unknown torrent") + return + } + index := int(off / int64(t.PieceLength(0))) + piece := t.Pieces[index] + pieceOff := peer_protocol.Integer(off % int64(t.PieceLength(0))) + switch piece.State { + case pieceStateComplete: + high := int(t.PieceLength(index) - peer_protocol.Integer(off%int64(t.PieceLength(0)))) + if high < len(p) { + p = p[:high] + } + case pieceStateIncomplete: + for cs, _ := range piece.PendingChunkSpecs { + chunkOff := int64(pieceOff - cs.Begin) + if 0 <= chunkOff && chunkOff < int64(cs.Length) { + // read begins in a pending chunk + err = ErrDataNotReady + return + } + // pending chunk caps available data + if chunkOff < 0 && int64(len(p)) > -chunkOff { + p = p[:-chunkOff] + } + } + default: + err = ErrDataNotReady + return + } + n, err = t.Data.ReadAt(p, off) + }) + <-done + log.Println(n, err) + return +} - noTorrents: make(chan struct{}), - addTorrent: make(chan *Torrent), - torrentFinished: make(chan InfoHash), - actorTask: make(chan func()), - } +func (c *Client) Start() { + c.torrents = make(map[InfoHash]*Torrent) + c.noTorrents = make(chan struct{}) + c.addTorrent = make(chan *Torrent) + c.torrentFinished = make(chan InfoHash) + c.actorTask = make(chan func()) o := copy(c.PeerId[:], BEP20) _, err := rand.Read(c.PeerId[o:]) if err != nil { panic("error generating peer id") } go c.run() - return c } func mmapTorrentData(metaInfo *metainfo.MetaInfo, location string) (mms MMapSpan, err error) { @@ -482,9 +529,9 @@ func (me *Client) runConnection(torrent *Torrent, conn *connection) error { case peer_protocol.Have: me.peerGotPiece(torrent, conn, int(msg.Index)) case peer_protocol.Request: - conn.PeerRequests[request{ + conn.PeerRequests[Request{ Index: msg.Index, - chunkSpec: chunkSpec{msg.Begin, msg.Length}, + ChunkSpec: ChunkSpec{msg.Begin, msg.Length}, }] = struct{}{} case peer_protocol.Bitfield: if len(msg.Bitfield) < len(torrent.Pieces) { @@ -502,13 +549,13 @@ func (me *Client) runConnection(torrent *Torrent, conn *connection) error { } } case peer_protocol.Piece: - request_ := request{msg.Index, chunkSpec{msg.Begin, peer_protocol.Integer(len(msg.Piece))}} + request_ := Request{msg.Index, ChunkSpec{msg.Begin, peer_protocol.Integer(len(msg.Piece))}} if _, ok := conn.Requests[request_]; !ok { err = errors.New("unexpected piece") break } delete(conn.Requests, request_) - if _, ok := torrent.Pieces[request_.Index].PendingChunkSpecs[request_.chunkSpec]; !ok { + if _, ok := torrent.Pieces[request_.Index].PendingChunkSpecs[request_.ChunkSpec]; !ok { log.Printf("got unnecessary chunk: %s", request_) break } @@ -516,7 +563,8 @@ func (me *Client) runConnection(torrent *Torrent, conn *connection) error { if err != nil { break } - delete(torrent.Pieces[request_.Index].PendingChunkSpecs, request_.chunkSpec) + delete(torrent.Pieces[request_.Index].PendingChunkSpecs, request_.ChunkSpec) + me.downloadedChunk(torrent, request_) if len(torrent.Pieces[request_.Index].PendingChunkSpecs) == 0 { torrent.Pieces[request_.Index].State = pieceStateUnknown go me.verifyPiece(torrent, int(request_.Index)) @@ -589,6 +637,7 @@ func (me *Client) AddPeers(infoHash InfoHash, peers []Peer) (err error) { func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) error { torrent := &Torrent{ InfoHash: BytesInfoHash(metaInfo.InfoHash), + MetaInfo: metaInfo, } for offset := 0; offset < len(metaInfo.Pieces); offset += PieceHash.Size() { hash := metaInfo.Pieces[offset : offset+PieceHash.Size()] @@ -604,7 +653,6 @@ func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) error { if err != nil { return err } - torrent.MetaInfo = metaInfo me.addTorrent <- torrent return nil } @@ -613,7 +661,7 @@ func (me *Client) WaitAll() { <-me.noTorrents } -func (me *Client) Close() { +func (me *Client) Stop() { } func (me *Client) withContext(f func()) { @@ -633,7 +681,7 @@ func (me *Client) replenishConnRequests(torrent *Torrent, conn *connection) { continue } for chunkSpec, _ := range torrent.Pieces[index].PendingChunkSpecs { - request := request{peer_protocol.Integer(index), chunkSpec} + request := Request{peer_protocol.Integer(index), chunkSpec} if heat := requestHeatMap[request]; heat > 0 { continue } @@ -644,7 +692,18 @@ func (me *Client) replenishConnRequests(torrent *Torrent, conn *connection) { } } //conn.SetInterested(false) +} +func (me *Client) downloadedChunk(t *Torrent, chunk Request) { +} + +func (cl *Client) dataReady(ds DataSpec) { + if cl.DataReady == nil { + return + } + go func() { + cl.DataReady <- ds + }() } func (me *Client) pieceHashed(ih InfoHash, piece int, correct bool) { @@ -661,8 +720,18 @@ func (me *Client) pieceHashed(ih InfoHash, piece int, correct bool) { return } torrent.Pieces[piece].State = newState - if newState == pieceStateIncomplete { + switch newState { + case pieceStateIncomplete: torrent.Pieces[piece].PendingChunkSpecs = torrent.pieceChunkSpecs(piece) + case pieceStateComplete: + log.Print(piece) + me.dataReady(DataSpec{ + torrent.InfoHash, + Request{ + peer_protocol.Integer(piece), + ChunkSpec{0, peer_protocol.Integer(torrent.PieceLength(piece))}, + }, + }) } for _, conn := range torrent.Conns { if correct { diff --git a/cmd/torrentfs/main.go b/cmd/torrentfs/main.go index e567aa29..51cd516b 100644 --- a/cmd/torrentfs/main.go +++ b/cmd/torrentfs/main.go @@ -5,11 +5,16 @@ import ( fusefs "bazil.org/fuse/fs" "bitbucket.org/anacrolix/go.torrent" "flag" + "github.com/davecheney/profile" metainfo "github.com/nsf/libtorgo/torrent" "log" + "net/http" + _ "net/http/pprof" "os" "os/user" "path/filepath" + "sync" + "syscall" ) var ( @@ -35,7 +40,40 @@ func init() { } type TorrentFS struct { - Client *torrent.Client + Client *torrent.Client + DataSubs map[chan torrent.DataSpec]struct{} + sync.Mutex +} + +func (tfs *TorrentFS) publishData() { + for { + spec := <-tfs.Client.DataReady + log.Printf("ready data: %s", spec) + tfs.Lock() + for ds := range tfs.DataSubs { + ds <- spec + } + tfs.Unlock() + } +} + +func (tfs *TorrentFS) SubscribeData() chan torrent.DataSpec { + ch := make(chan torrent.DataSpec) + tfs.Lock() + tfs.DataSubs[ch] = struct{}{} + tfs.Unlock() + return ch +} + +func (tfs *TorrentFS) UnsubscribeData(ch chan torrent.DataSpec) { + go func() { + for _ = range ch { + } + }() + tfs.Lock() + delete(tfs.DataSubs, ch) + tfs.Unlock() + close(ch) } type rootNode struct { @@ -45,12 +83,14 @@ type rootNode struct { type node struct { path []string metaInfo *metainfo.MetaInfo - client *torrent.Client + FS *TorrentFS + InfoHash torrent.InfoHash } type fileNode struct { node - size uint64 + size uint64 + TorrentOffset int64 } func (fn fileNode) Attr() (attr fuse.Attr) { @@ -59,10 +99,49 @@ func (fn fileNode) Attr() (attr fuse.Attr) { return } +func (fn fileNode) Read(req *fuse.ReadRequest, resp *fuse.ReadResponse, intr fusefs.Intr) fuse.Error { + if req.Dir { + panic("hodor") + } + dataSpecs := fn.FS.SubscribeData() + defer fn.FS.UnsubscribeData(dataSpecs) + data := make([]byte, func() int { + _len := int64(fn.size) - req.Offset + if int64(req.Size) < _len { + return req.Size + } else { + return int(_len) + } + }()) + for { + n, err := fn.FS.Client.TorrentReadAt(torrent.BytesInfoHash(fn.metaInfo.InfoHash), fn.TorrentOffset+req.Offset, data) + switch err { + case nil: + resp.Data = data[:n] + return nil + case torrent.ErrDataNotReady: + select { + case <-dataSpecs: + case <-intr: + return fuse.Errno(syscall.EINTR) + } + default: + log.Print(err) + return fuse.EIO + } + } +} + type dirNode struct { node } +var ( + _ fusefs.HandleReadDirer = dirNode{} + + _ fusefs.HandleReader = fileNode{} +) + func isSubPath(parent, child []string) bool { if len(child) <= len(parent) { return false @@ -100,22 +179,23 @@ func (dn dirNode) ReadDir(intr fusefs.Intr) (des []fuse.Dirent, err fuse.Error) } func (dn dirNode) Lookup(name string, intr fusefs.Intr) (_node fusefs.Node, err fuse.Error) { + var torrentOffset int64 for _, fi := range dn.metaInfo.Files { if !isSubPath(dn.path, fi.Path) { + torrentOffset += fi.Length continue } if fi.Path[len(dn.path)] != name { + torrentOffset += fi.Length continue } - __node := node{ - path: append(dn.path, name), - metaInfo: dn.metaInfo, - client: dn.client, - } + __node := dn.node + __node.path = append(__node.path, name) if len(fi.Path) == len(dn.path)+1 { _node = fileNode{ - node: __node, - size: uint64(fi.Length), + node: __node, + size: uint64(fi.Length), + TorrentOffset: torrentOffset, } } else { _node = dirNode{__node} @@ -143,10 +223,11 @@ func (me rootNode) Lookup(name string, intr fusefs.Intr) (_node fusefs.Node, err if metaInfo.Name == name { __node := node{ metaInfo: metaInfo, - client: me.fs.Client, + FS: me.fs, + InfoHash: torrent.BytesInfoHash(metaInfo.InfoHash), } if isSingleFileTorrent(metaInfo) { - _node = fileNode{__node, uint64(metaInfo.Files[0].Length)} + _node = fileNode{__node, uint64(metaInfo.Files[0].Length), 0} } else { _node = dirNode{__node} } @@ -187,8 +268,17 @@ func (tfs *TorrentFS) Root() (fusefs.Node, fuse.Error) { } func main() { + pprofAddr := flag.String("pprofAddr", "", "pprof HTTP server bind address") flag.Parse() - client := torrent.NewClient(downloadDir) + if *pprofAddr != "" { + go http.ListenAndServe(*pprofAddr, nil) + } + defer profile.Start(profile.CPUProfile).Stop() + client := &torrent.Client{ + DataDir: downloadDir, + DataReady: make(chan torrent.DataSpec), + } + client.Start() torrentDir, err := os.Open(torrentPath) defer torrentDir.Close() if err != nil { @@ -212,6 +302,10 @@ func main() { if err != nil { log.Fatal(err) } - fs := &TorrentFS{client} + fs := &TorrentFS{ + Client: client, + DataSubs: make(map[chan torrent.DataSpec]struct{}), + } + go fs.publishData() fusefs.Serve(conn, fs) } -- 2.48.1