type piece struct {
State pieceState
Hash pieceSum
- PendingChunkSpecs map[chunkSpec]struct{}
+ PendingChunkSpecs map[ChunkSpec]struct{}
}
-type chunkSpec struct {
+type ChunkSpec struct {
Begin, Length peer_protocol.Integer
}
-type request struct {
+type Request struct {
Index peer_protocol.Integer
- chunkSpec
+ ChunkSpec
}
type connection struct {
Interested bool
Choked bool
- Requests map[request]struct{}
+ Requests map[Request]struct{}
PeerId [20]byte
PeerInterested bool
PeerChoked bool
- PeerRequests map[request]struct{}
+ PeerRequests map[Request]struct{}
PeerExtensions [8]byte
PeerPieces []bool
}
c.post <- msg
}
-func (c *connection) Request(chunk request) bool {
+func (c *connection) Request(chunk Request) bool {
if len(c.Requests) >= maxRequests {
return false
}
})
}
if c.Requests == nil {
- c.Requests = make(map[request]struct{}, maxRequests)
+ c.Requests = make(map[Request]struct{}, maxRequests)
}
c.Requests[chunk] = struct{}{}
return true
return
}
-func (t *Torrent) pieceChunkSpecs(index int) (cs map[chunkSpec]struct{}) {
- cs = make(map[chunkSpec]struct{}, (t.MetaInfo.PieceLength+chunkSize-1)/chunkSize)
- c := chunkSpec{
+func (t *Torrent) pieceChunkSpecs(index int) (cs map[ChunkSpec]struct{}) {
+ cs = make(map[ChunkSpec]struct{}, (t.MetaInfo.PieceLength+chunkSize-1)/chunkSize)
+ c := ChunkSpec{
Begin: 0,
}
- for left := peer_protocol.Integer(t.PieceSize(index)); left > 0; left -= c.Length {
+ for left := peer_protocol.Integer(t.PieceLength(index)); left > 0; left -= c.Length {
c.Length = left
if c.Length > chunkSize {
c.Length = chunkSize
return
}
-func (t *Torrent) requestHeat() (ret map[request]int) {
- ret = make(map[request]int)
+func (t *Torrent) requestHeat() (ret map[Request]int) {
+ ret = make(map[Request]int)
for _, conn := range t.Conns {
for req, _ := range conn.Requests {
ret[req]++
Port int
}
-func (t *Torrent) PieceSize(piece int) (size int64) {
+func (t *Torrent) PieceLength(piece int) (len_ peer_protocol.Integer) {
if piece == len(t.Pieces)-1 {
- size = t.Data.Size() % t.MetaInfo.PieceLength
+ len_ = peer_protocol.Integer(t.Data.Size() % t.MetaInfo.PieceLength)
}
- if size == 0 {
- size = t.MetaInfo.PieceLength
+ if len_ == 0 {
+ len_ = peer_protocol.Integer(t.MetaInfo.PieceLength)
}
return
}
if err != nil {
panic(err)
}
- if n != t.PieceSize(piece) {
- panic(fmt.Sprintf("hashed wrong number of bytes: expected %d; did %d; piece %d", t.PieceSize(piece), n, piece))
+ if peer_protocol.Integer(n) != t.PieceLength(piece) {
+ panic(fmt.Sprintf("hashed wrong number of bytes: expected %d; did %d; piece %d", t.PieceLength(piece), n, piece))
}
copyHashSum(ps[:], hash.Sum(nil))
return
}
+type DataSpec struct {
+ InfoHash
+ Request
+}
+
type Client struct {
DataDir string
HalfOpenLimit int
PeerId [20]byte
+ DataReady chan DataSpec
halfOpen int
torrents map[InfoHash]*Torrent
actorTask chan func()
}
-func NewClient(dataDir string) *Client {
- c := &Client{
- DataDir: dataDir,
- HalfOpenLimit: 10,
+var (
+ ErrDataNotReady = errors.New("data not ready")
+)
- torrents: make(map[InfoHash]*Torrent),
+func (cl *Client) TorrentReadAt(ih InfoHash, off int64, p []byte) (n int, err error) {
+ done := make(chan struct{})
+ cl.withContext(func() {
+ defer func() {
+ close(done)
+ }()
+ t := cl.torrent(ih)
+ if t == nil {
+ err = errors.New("unknown torrent")
+ return
+ }
+ index := int(off / int64(t.PieceLength(0)))
+ piece := t.Pieces[index]
+ pieceOff := peer_protocol.Integer(off % int64(t.PieceLength(0)))
+ switch piece.State {
+ case pieceStateComplete:
+ high := int(t.PieceLength(index) - peer_protocol.Integer(off%int64(t.PieceLength(0))))
+ if high < len(p) {
+ p = p[:high]
+ }
+ case pieceStateIncomplete:
+ for cs, _ := range piece.PendingChunkSpecs {
+ chunkOff := int64(pieceOff - cs.Begin)
+ if 0 <= chunkOff && chunkOff < int64(cs.Length) {
+ // read begins in a pending chunk
+ err = ErrDataNotReady
+ return
+ }
+ // pending chunk caps available data
+ if chunkOff < 0 && int64(len(p)) > -chunkOff {
+ p = p[:-chunkOff]
+ }
+ }
+ default:
+ err = ErrDataNotReady
+ return
+ }
+ n, err = t.Data.ReadAt(p, off)
+ })
+ <-done
+ log.Println(n, err)
+ return
+}
- noTorrents: make(chan struct{}),
- addTorrent: make(chan *Torrent),
- torrentFinished: make(chan InfoHash),
- actorTask: make(chan func()),
- }
+func (c *Client) Start() {
+ c.torrents = make(map[InfoHash]*Torrent)
+ c.noTorrents = make(chan struct{})
+ c.addTorrent = make(chan *Torrent)
+ c.torrentFinished = make(chan InfoHash)
+ c.actorTask = make(chan func())
o := copy(c.PeerId[:], BEP20)
_, err := rand.Read(c.PeerId[o:])
if err != nil {
panic("error generating peer id")
}
go c.run()
- return c
}
func mmapTorrentData(metaInfo *metainfo.MetaInfo, location string) (mms MMapSpan, err error) {
case peer_protocol.Have:
me.peerGotPiece(torrent, conn, int(msg.Index))
case peer_protocol.Request:
- conn.PeerRequests[request{
+ conn.PeerRequests[Request{
Index: msg.Index,
- chunkSpec: chunkSpec{msg.Begin, msg.Length},
+ ChunkSpec: ChunkSpec{msg.Begin, msg.Length},
}] = struct{}{}
case peer_protocol.Bitfield:
if len(msg.Bitfield) < len(torrent.Pieces) {
}
}
case peer_protocol.Piece:
- request_ := request{msg.Index, chunkSpec{msg.Begin, peer_protocol.Integer(len(msg.Piece))}}
+ request_ := Request{msg.Index, ChunkSpec{msg.Begin, peer_protocol.Integer(len(msg.Piece))}}
if _, ok := conn.Requests[request_]; !ok {
err = errors.New("unexpected piece")
break
}
delete(conn.Requests, request_)
- if _, ok := torrent.Pieces[request_.Index].PendingChunkSpecs[request_.chunkSpec]; !ok {
+ if _, ok := torrent.Pieces[request_.Index].PendingChunkSpecs[request_.ChunkSpec]; !ok {
log.Printf("got unnecessary chunk: %s", request_)
break
}
if err != nil {
break
}
- delete(torrent.Pieces[request_.Index].PendingChunkSpecs, request_.chunkSpec)
+ delete(torrent.Pieces[request_.Index].PendingChunkSpecs, request_.ChunkSpec)
+ me.downloadedChunk(torrent, request_)
if len(torrent.Pieces[request_.Index].PendingChunkSpecs) == 0 {
torrent.Pieces[request_.Index].State = pieceStateUnknown
go me.verifyPiece(torrent, int(request_.Index))
func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) error {
torrent := &Torrent{
InfoHash: BytesInfoHash(metaInfo.InfoHash),
+ MetaInfo: metaInfo,
}
for offset := 0; offset < len(metaInfo.Pieces); offset += PieceHash.Size() {
hash := metaInfo.Pieces[offset : offset+PieceHash.Size()]
if err != nil {
return err
}
- torrent.MetaInfo = metaInfo
me.addTorrent <- torrent
return nil
}
<-me.noTorrents
}
-func (me *Client) Close() {
+func (me *Client) Stop() {
}
func (me *Client) withContext(f func()) {
continue
}
for chunkSpec, _ := range torrent.Pieces[index].PendingChunkSpecs {
- request := request{peer_protocol.Integer(index), chunkSpec}
+ request := Request{peer_protocol.Integer(index), chunkSpec}
if heat := requestHeatMap[request]; heat > 0 {
continue
}
}
}
//conn.SetInterested(false)
+}
+func (me *Client) downloadedChunk(t *Torrent, chunk Request) {
+}
+
+func (cl *Client) dataReady(ds DataSpec) {
+ if cl.DataReady == nil {
+ return
+ }
+ go func() {
+ cl.DataReady <- ds
+ }()
}
func (me *Client) pieceHashed(ih InfoHash, piece int, correct bool) {
return
}
torrent.Pieces[piece].State = newState
- if newState == pieceStateIncomplete {
+ switch newState {
+ case pieceStateIncomplete:
torrent.Pieces[piece].PendingChunkSpecs = torrent.pieceChunkSpecs(piece)
+ case pieceStateComplete:
+ log.Print(piece)
+ me.dataReady(DataSpec{
+ torrent.InfoHash,
+ Request{
+ peer_protocol.Integer(piece),
+ ChunkSpec{0, peer_protocol.Integer(torrent.PieceLength(piece))},
+ },
+ })
}
for _, conn := range torrent.Conns {
if correct {
fusefs "bazil.org/fuse/fs"
"bitbucket.org/anacrolix/go.torrent"
"flag"
+ "github.com/davecheney/profile"
metainfo "github.com/nsf/libtorgo/torrent"
"log"
+ "net/http"
+ _ "net/http/pprof"
"os"
"os/user"
"path/filepath"
+ "sync"
+ "syscall"
)
var (
}
type TorrentFS struct {
- Client *torrent.Client
+ Client *torrent.Client
+ DataSubs map[chan torrent.DataSpec]struct{}
+ sync.Mutex
+}
+
+func (tfs *TorrentFS) publishData() {
+ for {
+ spec := <-tfs.Client.DataReady
+ log.Printf("ready data: %s", spec)
+ tfs.Lock()
+ for ds := range tfs.DataSubs {
+ ds <- spec
+ }
+ tfs.Unlock()
+ }
+}
+
+func (tfs *TorrentFS) SubscribeData() chan torrent.DataSpec {
+ ch := make(chan torrent.DataSpec)
+ tfs.Lock()
+ tfs.DataSubs[ch] = struct{}{}
+ tfs.Unlock()
+ return ch
+}
+
+func (tfs *TorrentFS) UnsubscribeData(ch chan torrent.DataSpec) {
+ go func() {
+ for _ = range ch {
+ }
+ }()
+ tfs.Lock()
+ delete(tfs.DataSubs, ch)
+ tfs.Unlock()
+ close(ch)
}
type rootNode struct {
type node struct {
path []string
metaInfo *metainfo.MetaInfo
- client *torrent.Client
+ FS *TorrentFS
+ InfoHash torrent.InfoHash
}
type fileNode struct {
node
- size uint64
+ size uint64
+ TorrentOffset int64
}
func (fn fileNode) Attr() (attr fuse.Attr) {
return
}
+func (fn fileNode) Read(req *fuse.ReadRequest, resp *fuse.ReadResponse, intr fusefs.Intr) fuse.Error {
+ if req.Dir {
+ panic("hodor")
+ }
+ dataSpecs := fn.FS.SubscribeData()
+ defer fn.FS.UnsubscribeData(dataSpecs)
+ data := make([]byte, func() int {
+ _len := int64(fn.size) - req.Offset
+ if int64(req.Size) < _len {
+ return req.Size
+ } else {
+ return int(_len)
+ }
+ }())
+ for {
+ n, err := fn.FS.Client.TorrentReadAt(torrent.BytesInfoHash(fn.metaInfo.InfoHash), fn.TorrentOffset+req.Offset, data)
+ switch err {
+ case nil:
+ resp.Data = data[:n]
+ return nil
+ case torrent.ErrDataNotReady:
+ select {
+ case <-dataSpecs:
+ case <-intr:
+ return fuse.Errno(syscall.EINTR)
+ }
+ default:
+ log.Print(err)
+ return fuse.EIO
+ }
+ }
+}
+
type dirNode struct {
node
}
+var (
+ _ fusefs.HandleReadDirer = dirNode{}
+
+ _ fusefs.HandleReader = fileNode{}
+)
+
func isSubPath(parent, child []string) bool {
if len(child) <= len(parent) {
return false
}
func (dn dirNode) Lookup(name string, intr fusefs.Intr) (_node fusefs.Node, err fuse.Error) {
+ var torrentOffset int64
for _, fi := range dn.metaInfo.Files {
if !isSubPath(dn.path, fi.Path) {
+ torrentOffset += fi.Length
continue
}
if fi.Path[len(dn.path)] != name {
+ torrentOffset += fi.Length
continue
}
- __node := node{
- path: append(dn.path, name),
- metaInfo: dn.metaInfo,
- client: dn.client,
- }
+ __node := dn.node
+ __node.path = append(__node.path, name)
if len(fi.Path) == len(dn.path)+1 {
_node = fileNode{
- node: __node,
- size: uint64(fi.Length),
+ node: __node,
+ size: uint64(fi.Length),
+ TorrentOffset: torrentOffset,
}
} else {
_node = dirNode{__node}
if metaInfo.Name == name {
__node := node{
metaInfo: metaInfo,
- client: me.fs.Client,
+ FS: me.fs,
+ InfoHash: torrent.BytesInfoHash(metaInfo.InfoHash),
}
if isSingleFileTorrent(metaInfo) {
- _node = fileNode{__node, uint64(metaInfo.Files[0].Length)}
+ _node = fileNode{__node, uint64(metaInfo.Files[0].Length), 0}
} else {
_node = dirNode{__node}
}
}
func main() {
+ pprofAddr := flag.String("pprofAddr", "", "pprof HTTP server bind address")
flag.Parse()
- client := torrent.NewClient(downloadDir)
+ if *pprofAddr != "" {
+ go http.ListenAndServe(*pprofAddr, nil)
+ }
+ defer profile.Start(profile.CPUProfile).Stop()
+ client := &torrent.Client{
+ DataDir: downloadDir,
+ DataReady: make(chan torrent.DataSpec),
+ }
+ client.Start()
torrentDir, err := os.Open(torrentPath)
defer torrentDir.Close()
if err != nil {
if err != nil {
log.Fatal(err)
}
- fs := &TorrentFS{client}
+ fs := &TorrentFS{
+ Client: client,
+ DataSubs: make(map[chan torrent.DataSpec]struct{}),
+ }
+ go fs.publishData()
fusefs.Serve(conn, fs)
}