From: Matt Joiner Date: Mon, 14 Oct 2013 14:39:12 +0000 (+1100) Subject: Implement prioritizing of torrent data regions based on FS activity X-Git-Tag: v1.0.0~1796 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=0a5043ba69d5ce4c20ff9f0387e34d38b79ba346;p=btrtrc.git Implement prioritizing of torrent data regions based on FS activity --- diff --git a/client.go b/client.go index 3e92663d..5509da6f 100644 --- a/client.go +++ b/client.go @@ -82,7 +82,12 @@ type connection struct { PeerPieces []bool } -func (c *connection) PeerHasPiece(index int) bool { +func (c *connection) Close() { + c.Socket.Close() + close(c.post) +} + +func (c *connection) PeerHasPiece(index peer_protocol.Integer) bool { if c.PeerPieces == nil { return false } @@ -97,6 +102,10 @@ func (c *connection) Request(chunk Request) bool { if len(c.Requests) >= maxRequests { return false } + c.SetInterested(true) + if c.PeerChoked { + return false + } if _, ok := c.Requests[chunk]; !ok { c.Post(peer_protocol.Message{ Type: peer_protocol.Request, @@ -131,10 +140,12 @@ func (c *connection) SetInterested(interested bool) { func (conn *connection) writer() { for { b := <-conn.write + if b == nil { + break + } n, err := conn.Socket.Write(b) if err != nil { log.Print(err) - close(conn.write) break } if n != len(b) { @@ -147,6 +158,7 @@ func (conn *connection) writer() { func (conn *connection) writeOptimizer() { pending := list.New() var nextWrite []byte + defer close(conn.write) for { write := conn.write if pending.Len() == 0 { @@ -159,7 +171,10 @@ func (conn *connection) writeOptimizer() { } } select { - case msg := <-conn.post: + case msg, ok := <-conn.post: + if !ok { + return + } pending.PushBack(msg) case write <- nextWrite: pending.Remove(pending.Front()) @@ -168,12 +183,53 @@ func (conn *connection) writeOptimizer() { } type Torrent struct { - InfoHash InfoHash - Pieces []piece - Data MMapSpan - MetaInfo *metainfo.MetaInfo - Conns []*connection - Peers []Peer + InfoHash InfoHash + Pieces []piece + Data MMapSpan + MetaInfo *metainfo.MetaInfo + Conns []*connection + Peers []Peer + Priorities *list.List +} + +func (t *Torrent) PrioritizeDataRegion(off, len_ int64) { + newPriorities := make([]Request, 0, (len_+2*(chunkSize-1))/chunkSize) + for len_ > 0 { + index := peer_protocol.Integer(off / int64(t.PieceLength(0))) + pieceOff := peer_protocol.Integer(off % int64(t.PieceLength(0))) + piece := t.Pieces[index] + if piece.State == pieceStateComplete { + adv := int64(t.PieceLength(index) - pieceOff) + off += adv + len_ -= adv + continue + } + chunk := ChunkSpec{pieceOff / chunkSize * chunkSize, chunkSize} + adv := int64(chunkSize - pieceOff%chunkSize) + off += adv + len_ -= adv + switch piece.State { + case pieceStateIncomplete: + if _, ok := piece.PendingChunkSpecs[chunk]; !ok { + continue + } + case pieceStateUnknown: + default: + panic("unexpected piece state") + } + newPriorities = append(newPriorities, Request{index, chunk}) + } + if len(newPriorities) < 1 { + return + } + log.Print(newPriorities) + if t.Priorities == nil { + t.Priorities = list.New() + } + t.Priorities.PushFront(newPriorities[0]) + for _, req := range newPriorities[1:] { + t.Priorities.PushBack(req) + } } func (t *Torrent) WriteChunk(piece int, begin int64, data []byte) (err error) { @@ -188,7 +244,7 @@ func (t *Torrent) bitfield() (bf []bool) { return } -func (t *Torrent) pieceChunkSpecs(index int) (cs map[ChunkSpec]struct{}) { +func (t *Torrent) pieceChunkSpecs(index peer_protocol.Integer) (cs map[ChunkSpec]struct{}) { cs = make(map[ChunkSpec]struct{}, (t.MetaInfo.PieceLength+chunkSize-1)/chunkSize) c := ChunkSpec{ Begin: 0, @@ -220,8 +276,8 @@ type Peer struct { Port int } -func (t *Torrent) PieceLength(piece int) (len_ peer_protocol.Integer) { - if piece == len(t.Pieces)-1 { +func (t *Torrent) PieceLength(piece peer_protocol.Integer) (len_ peer_protocol.Integer) { + if int(piece) == len(t.Pieces)-1 { len_ = peer_protocol.Integer(t.Data.Size() % t.MetaInfo.PieceLength) } if len_ == 0 { @@ -230,7 +286,7 @@ func (t *Torrent) PieceLength(piece int) (len_ peer_protocol.Integer) { return } -func (t *Torrent) HashPiece(piece int) (ps pieceSum) { +func (t *Torrent) HashPiece(piece peer_protocol.Integer) (ps pieceSum) { hash := PieceHash.New() n, err := t.Data.WriteSectionTo(hash, int64(piece)*t.MetaInfo.PieceLength, t.MetaInfo.PieceLength) if err != nil { @@ -278,7 +334,7 @@ func (cl *Client) TorrentReadAt(ih InfoHash, off int64, p []byte) (n int, err er err = errors.New("unknown torrent") return } - index := int(off / int64(t.PieceLength(0))) + index := peer_protocol.Integer(off / int64(t.PieceLength(0))) piece := t.Pieces[index] pieceOff := peer_protocol.Integer(off % int64(t.PieceLength(0))) switch piece.State { @@ -317,6 +373,9 @@ func (c *Client) Start() { c.addTorrent = make(chan *Torrent) c.torrentFinished = make(chan InfoHash) c.actorTask = make(chan func()) + if c.HalfOpenLimit == 0 { + c.HalfOpenLimit = 10 + } o := copy(c.PeerId[:], BEP20) _, err := rand.Read(c.PeerId[o:]) if err != nil { @@ -422,6 +481,7 @@ func (me *Client) handshake(sock net.Conn, torrent *Torrent, peerId [20]byte) { write: make(chan []byte), post: make(chan encoding.BinaryMarshaler), } + defer conn.Close() go conn.writer() go conn.writeOptimizer() conn.post <- peer_protocol.Bytes(peer_protocol.Protocol) @@ -432,11 +492,16 @@ func (me *Client) handshake(sock net.Conn, torrent *Torrent, peerId [20]byte) { } var b [28]byte _, err := io.ReadFull(conn.Socket, b[:]) - if err != nil { - log.Fatal(err) + switch err { + case nil: + case io.EOF: + return + default: + err = fmt.Errorf("when reading protocol and extensions: %s", err) + return } if string(b[:20]) != peer_protocol.Protocol { - log.Printf("wrong protocol: %#v", string(b[:20])) + err = fmt.Errorf("wrong protocol: %#v", string(b[:20])) return } if 8 != copy(conn.PeerExtensions[:], b[20:]) { @@ -460,6 +525,7 @@ func (me *Client) handshake(sock net.Conn, torrent *Torrent, peerId [20]byte) { conn.post <- peer_protocol.Bytes(torrent.InfoHash[:]) conn.post <- peer_protocol.Bytes(me.PeerId[:]) } + done := make(chan struct{}) me.withContext(func() { me.addConnection(torrent, conn) if torrent.haveAnyPieces() { @@ -468,16 +534,16 @@ func (me *Client) handshake(sock net.Conn, torrent *Torrent, peerId [20]byte) { Bitfield: torrent.bitfield(), }) } - go func() { - defer me.withContext(func() { - me.dropConnection(torrent, conn) - }) - err := me.runConnection(torrent, conn) - if err != nil { - log.Print(err) - } - }() + close(done) }) + <-done + defer me.withContext(func() { + me.dropConnection(torrent, conn) + }) + err = me.runConnection(torrent, conn) + if err != nil { + log.Print(err) + } } func (me *Client) peerGotPiece(torrent *Torrent, conn *connection, piece int) { @@ -486,7 +552,6 @@ func (me *Client) peerGotPiece(torrent *Torrent, conn *connection, piece int) { } conn.PeerPieces[piece] = true if torrent.wantPiece(piece) { - conn.SetInterested(true) me.replenishConnRequests(torrent, conn) } } @@ -555,20 +620,7 @@ func (me *Client) runConnection(torrent *Torrent, conn *connection) error { break } delete(conn.Requests, request_) - if _, ok := torrent.Pieces[request_.Index].PendingChunkSpecs[request_.ChunkSpec]; !ok { - log.Printf("got unnecessary chunk: %s", request_) - break - } - err = torrent.WriteChunk(int(msg.Index), int64(msg.Begin), msg.Piece) - if err != nil { - break - } - delete(torrent.Pieces[request_.Index].PendingChunkSpecs, request_.ChunkSpec) - me.downloadedChunk(torrent, request_) - if len(torrent.Pieces[request_.Index].PendingChunkSpecs) == 0 { - torrent.Pieces[request_.Index].State = pieceStateUnknown - go me.verifyPiece(torrent, int(request_.Index)) - } + err = me.downloadedChunk(torrent, msg) default: log.Printf("received unknown message type: %#v", msg.Type) } @@ -669,32 +721,56 @@ func (me *Client) withContext(f func()) { } func (me *Client) replenishConnRequests(torrent *Torrent, conn *connection) { - if len(conn.Requests) >= maxRequests { - return - } - if conn.PeerChoked { + requestHeatMap := torrent.requestHeat() + if torrent.Priorities == nil { return } - requestHeatMap := torrent.requestHeat() - for index, has := range conn.PeerPieces { - if !has { + for e := torrent.Priorities.Front(); e != nil; e = e.Next() { + req := e.Value.(Request) + if !conn.PeerPieces[req.Index] { continue } - for chunkSpec, _ := range torrent.Pieces[index].PendingChunkSpecs { - request := Request{peer_protocol.Integer(index), chunkSpec} - if heat := requestHeatMap[request]; heat > 0 { - continue - } - conn.SetInterested(true) - if !conn.Request(request) { - return - } + switch torrent.Pieces[req.Index].State { + case pieceStateUnknown: + continue + case pieceStateIncomplete: + default: + panic("prioritized chunk for invalid piece state") + } + if requestHeatMap[req] > 0 { + continue + } + if !conn.Request(req) { + break } } - //conn.SetInterested(false) } -func (me *Client) downloadedChunk(t *Torrent, chunk Request) { +func (me *Client) downloadedChunk(torrent *Torrent, msg *peer_protocol.Message) (err error) { + request := Request{msg.Index, ChunkSpec{msg.Begin, peer_protocol.Integer(len(msg.Piece))}} + if _, ok := torrent.Pieces[request.Index].PendingChunkSpecs[request.ChunkSpec]; !ok { + log.Printf("got unnecessary chunk: %s", request) + return + } + err = torrent.WriteChunk(int(msg.Index), int64(msg.Begin), msg.Piece) + if err != nil { + return + } + delete(torrent.Pieces[request.Index].PendingChunkSpecs, request.ChunkSpec) + if len(torrent.Pieces[request.Index].PendingChunkSpecs) == 0 { + torrent.Pieces[request.Index].State = pieceStateUnknown + go me.verifyPiece(torrent, request.Index) + return + } + var next *list.Element + for e := torrent.Priorities.Front(); e != nil; e = next { + next = e.Next() + if e.Value.(Request) == request { + torrent.Priorities.Remove(e) + } + } + me.dataReady(DataSpec{torrent.InfoHash, request}) + return } func (cl *Client) dataReady(ds DataSpec) { @@ -706,7 +782,7 @@ func (cl *Client) dataReady(ds DataSpec) { }() } -func (me *Client) pieceHashed(ih InfoHash, piece int, correct bool) { +func (me *Client) pieceHashed(ih InfoHash, piece peer_protocol.Integer, correct bool) { torrent := me.torrents[ih] newState := func() pieceState { if correct { @@ -715,16 +791,24 @@ func (me *Client) pieceHashed(ih InfoHash, piece int, correct bool) { return pieceStateIncomplete } }() - oldState := torrent.Pieces[piece].State - if newState == oldState { - return - } + // oldState := torrent.Pieces[piece].State + // if newState == oldState { + // return + // } torrent.Pieces[piece].State = newState switch newState { case pieceStateIncomplete: - torrent.Pieces[piece].PendingChunkSpecs = torrent.pieceChunkSpecs(piece) + torrent.Pieces[int(piece)].PendingChunkSpecs = torrent.pieceChunkSpecs(piece) case pieceStateComplete: - log.Print(piece) + var next *list.Element + if torrent.Priorities != nil { + for e := torrent.Priorities.Front(); e != nil; e = next { + next = e.Next() + if e.Value.(Request).Index == piece { + torrent.Priorities.Remove(e) + } + } + } me.dataReady(DataSpec{ torrent.InfoHash, Request{ @@ -747,11 +831,14 @@ func (me *Client) pieceHashed(ih InfoHash, piece int, correct bool) { } } -func (me *Client) verifyPiece(torrent *Torrent, index int) { +func (me *Client) verifyPiece(torrent *Torrent, index peer_protocol.Integer) { sum := torrent.HashPiece(index) + done := make(chan struct{}) me.withContext(func() { me.pieceHashed(torrent.InfoHash, index, sum == torrent.Pieces[index].Hash) + close(done) }) + <-done } func (me *Client) run() { @@ -769,7 +856,7 @@ func (me *Client) run() { me.torrents[torrent.InfoHash] = torrent go func() { for index := range torrent.Pieces { - me.verifyPiece(torrent, index) + me.verifyPiece(torrent, peer_protocol.Integer(index)) } }() case infoHash := <-me.torrentFinished: @@ -791,3 +878,16 @@ func (me *Client) Torrents() (ret []*Torrent) { <-done return } + +func (cl *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) { + done := make(chan struct{}) + cl.withContext(func() { + t := cl.torrent(ih) + t.PrioritizeDataRegion(off, len_) + for _, cn := range t.Conns { + cl.replenishConnRequests(t, cn) + } + close(done) + }) + <-done +} diff --git a/cmd/torrentfs/main.go b/cmd/torrentfs/main.go index 51cd516b..8689285e 100644 --- a/cmd/torrentfs/main.go +++ b/cmd/torrentfs/main.go @@ -8,13 +8,13 @@ import ( "github.com/davecheney/profile" metainfo "github.com/nsf/libtorgo/torrent" "log" + "net" "net/http" _ "net/http/pprof" "os" "os/user" "path/filepath" "sync" - "syscall" ) var ( @@ -48,7 +48,6 @@ type TorrentFS struct { func (tfs *TorrentFS) publishData() { for { spec := <-tfs.Client.DataReady - log.Printf("ready data: %s", spec) tfs.Lock() for ds := range tfs.DataSubs { ds <- spec @@ -110,11 +109,15 @@ func (fn fileNode) Read(req *fuse.ReadRequest, resp *fuse.ReadResponse, intr fus if int64(req.Size) < _len { return req.Size } else { + // limit read to the end of the file return int(_len) } }()) + infoHash := torrent.BytesInfoHash(fn.metaInfo.InfoHash) + torrentOff := fn.TorrentOffset + req.Offset + fn.FS.Client.PrioritizeDataRegion(infoHash, torrentOff, int64(len(data))) for { - n, err := fn.FS.Client.TorrentReadAt(torrent.BytesInfoHash(fn.metaInfo.InfoHash), fn.TorrentOffset+req.Offset, data) + n, err := fn.FS.Client.TorrentReadAt(infoHash, torrentOff, data) switch err { case nil: resp.Data = data[:n] @@ -123,7 +126,7 @@ func (fn fileNode) Read(req *fuse.ReadRequest, resp *fuse.ReadResponse, intr fus select { case <-dataSpecs: case <-intr: - return fuse.Errno(syscall.EINTR) + return fuse.EINTR } default: log.Print(err) @@ -270,6 +273,7 @@ func (tfs *TorrentFS) Root() (fusefs.Node, fuse.Error) { func main() { pprofAddr := flag.String("pprofAddr", "", "pprof HTTP server bind address") flag.Parse() + log.SetFlags(log.LstdFlags | log.Lshortfile) if *pprofAddr != "" { go http.ListenAndServe(*pprofAddr, nil) } @@ -297,6 +301,10 @@ func main() { if err != nil { log.Print(err) } + client.AddPeers(torrent.BytesInfoHash(metaInfo.InfoHash), []torrent.Peer{{ + IP: net.IPv4(127, 0, 0, 1), + Port: 3000, + }}) } conn, err := fuse.Mount(mountDir) if err != nil {