]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Implement prioritizing of torrent data regions based on FS activity
authorMatt Joiner <anacrolix@gmail.com>
Mon, 14 Oct 2013 14:39:12 +0000 (01:39 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 14 Oct 2013 14:39:12 +0000 (01:39 +1100)
client.go
cmd/torrentfs/main.go

index 3e92663d4f652f224b668b0fe6f4c8a66d4dab65..5509da6f03bfbd4ceb203dc7ae9cdcc0ecb7f740 100644 (file)
--- a/client.go
+++ b/client.go
@@ -82,7 +82,12 @@ type connection struct {
        PeerPieces     []bool
 }
 
-func (c *connection) PeerHasPiece(index int) bool {
+func (c *connection) Close() {
+       c.Socket.Close()
+       close(c.post)
+}
+
+func (c *connection) PeerHasPiece(index peer_protocol.Integer) bool {
        if c.PeerPieces == nil {
                return false
        }
@@ -97,6 +102,10 @@ func (c *connection) Request(chunk Request) bool {
        if len(c.Requests) >= maxRequests {
                return false
        }
+       c.SetInterested(true)
+       if c.PeerChoked {
+               return false
+       }
        if _, ok := c.Requests[chunk]; !ok {
                c.Post(peer_protocol.Message{
                        Type:   peer_protocol.Request,
@@ -131,10 +140,12 @@ func (c *connection) SetInterested(interested bool) {
 func (conn *connection) writer() {
        for {
                b := <-conn.write
+               if b == nil {
+                       break
+               }
                n, err := conn.Socket.Write(b)
                if err != nil {
                        log.Print(err)
-                       close(conn.write)
                        break
                }
                if n != len(b) {
@@ -147,6 +158,7 @@ func (conn *connection) writer() {
 func (conn *connection) writeOptimizer() {
        pending := list.New()
        var nextWrite []byte
+       defer close(conn.write)
        for {
                write := conn.write
                if pending.Len() == 0 {
@@ -159,7 +171,10 @@ func (conn *connection) writeOptimizer() {
                        }
                }
                select {
-               case msg := <-conn.post:
+               case msg, ok := <-conn.post:
+                       if !ok {
+                               return
+                       }
                        pending.PushBack(msg)
                case write <- nextWrite:
                        pending.Remove(pending.Front())
@@ -168,12 +183,53 @@ func (conn *connection) writeOptimizer() {
 }
 
 type Torrent struct {
-       InfoHash InfoHash
-       Pieces   []piece
-       Data     MMapSpan
-       MetaInfo *metainfo.MetaInfo
-       Conns    []*connection
-       Peers    []Peer
+       InfoHash   InfoHash
+       Pieces     []piece
+       Data       MMapSpan
+       MetaInfo   *metainfo.MetaInfo
+       Conns      []*connection
+       Peers      []Peer
+       Priorities *list.List
+}
+
+func (t *Torrent) PrioritizeDataRegion(off, len_ int64) {
+       newPriorities := make([]Request, 0, (len_+2*(chunkSize-1))/chunkSize)
+       for len_ > 0 {
+               index := peer_protocol.Integer(off / int64(t.PieceLength(0)))
+               pieceOff := peer_protocol.Integer(off % int64(t.PieceLength(0)))
+               piece := t.Pieces[index]
+               if piece.State == pieceStateComplete {
+                       adv := int64(t.PieceLength(index) - pieceOff)
+                       off += adv
+                       len_ -= adv
+                       continue
+               }
+               chunk := ChunkSpec{pieceOff / chunkSize * chunkSize, chunkSize}
+               adv := int64(chunkSize - pieceOff%chunkSize)
+               off += adv
+               len_ -= adv
+               switch piece.State {
+               case pieceStateIncomplete:
+                       if _, ok := piece.PendingChunkSpecs[chunk]; !ok {
+                               continue
+                       }
+               case pieceStateUnknown:
+               default:
+                       panic("unexpected piece state")
+               }
+               newPriorities = append(newPriorities, Request{index, chunk})
+       }
+       if len(newPriorities) < 1 {
+               return
+       }
+       log.Print(newPriorities)
+       if t.Priorities == nil {
+               t.Priorities = list.New()
+       }
+       t.Priorities.PushFront(newPriorities[0])
+       for _, req := range newPriorities[1:] {
+               t.Priorities.PushBack(req)
+       }
 }
 
 func (t *Torrent) WriteChunk(piece int, begin int64, data []byte) (err error) {
@@ -188,7 +244,7 @@ func (t *Torrent) bitfield() (bf []bool) {
        return
 }
 
-func (t *Torrent) pieceChunkSpecs(index int) (cs map[ChunkSpec]struct{}) {
+func (t *Torrent) pieceChunkSpecs(index peer_protocol.Integer) (cs map[ChunkSpec]struct{}) {
        cs = make(map[ChunkSpec]struct{}, (t.MetaInfo.PieceLength+chunkSize-1)/chunkSize)
        c := ChunkSpec{
                Begin: 0,
@@ -220,8 +276,8 @@ type Peer struct {
        Port int
 }
 
-func (t *Torrent) PieceLength(piece int) (len_ peer_protocol.Integer) {
-       if piece == len(t.Pieces)-1 {
+func (t *Torrent) PieceLength(piece peer_protocol.Integer) (len_ peer_protocol.Integer) {
+       if int(piece) == len(t.Pieces)-1 {
                len_ = peer_protocol.Integer(t.Data.Size() % t.MetaInfo.PieceLength)
        }
        if len_ == 0 {
@@ -230,7 +286,7 @@ func (t *Torrent) PieceLength(piece int) (len_ peer_protocol.Integer) {
        return
 }
 
-func (t *Torrent) HashPiece(piece int) (ps pieceSum) {
+func (t *Torrent) HashPiece(piece peer_protocol.Integer) (ps pieceSum) {
        hash := PieceHash.New()
        n, err := t.Data.WriteSectionTo(hash, int64(piece)*t.MetaInfo.PieceLength, t.MetaInfo.PieceLength)
        if err != nil {
@@ -278,7 +334,7 @@ func (cl *Client) TorrentReadAt(ih InfoHash, off int64, p []byte) (n int, err er
                        err = errors.New("unknown torrent")
                        return
                }
-               index := int(off / int64(t.PieceLength(0)))
+               index := peer_protocol.Integer(off / int64(t.PieceLength(0)))
                piece := t.Pieces[index]
                pieceOff := peer_protocol.Integer(off % int64(t.PieceLength(0)))
                switch piece.State {
@@ -317,6 +373,9 @@ func (c *Client) Start() {
        c.addTorrent = make(chan *Torrent)
        c.torrentFinished = make(chan InfoHash)
        c.actorTask = make(chan func())
+       if c.HalfOpenLimit == 0 {
+               c.HalfOpenLimit = 10
+       }
        o := copy(c.PeerId[:], BEP20)
        _, err := rand.Read(c.PeerId[o:])
        if err != nil {
@@ -422,6 +481,7 @@ func (me *Client) handshake(sock net.Conn, torrent *Torrent, peerId [20]byte) {
                write:      make(chan []byte),
                post:       make(chan encoding.BinaryMarshaler),
        }
+       defer conn.Close()
        go conn.writer()
        go conn.writeOptimizer()
        conn.post <- peer_protocol.Bytes(peer_protocol.Protocol)
@@ -432,11 +492,16 @@ func (me *Client) handshake(sock net.Conn, torrent *Torrent, peerId [20]byte) {
        }
        var b [28]byte
        _, err := io.ReadFull(conn.Socket, b[:])
-       if err != nil {
-               log.Fatal(err)
+       switch err {
+       case nil:
+       case io.EOF:
+               return
+       default:
+               err = fmt.Errorf("when reading protocol and extensions: %s", err)
+               return
        }
        if string(b[:20]) != peer_protocol.Protocol {
-               log.Printf("wrong protocol: %#v", string(b[:20]))
+               err = fmt.Errorf("wrong protocol: %#v", string(b[:20]))
                return
        }
        if 8 != copy(conn.PeerExtensions[:], b[20:]) {
@@ -460,6 +525,7 @@ func (me *Client) handshake(sock net.Conn, torrent *Torrent, peerId [20]byte) {
                conn.post <- peer_protocol.Bytes(torrent.InfoHash[:])
                conn.post <- peer_protocol.Bytes(me.PeerId[:])
        }
+       done := make(chan struct{})
        me.withContext(func() {
                me.addConnection(torrent, conn)
                if torrent.haveAnyPieces() {
@@ -468,16 +534,16 @@ func (me *Client) handshake(sock net.Conn, torrent *Torrent, peerId [20]byte) {
                                Bitfield: torrent.bitfield(),
                        })
                }
-               go func() {
-                       defer me.withContext(func() {
-                               me.dropConnection(torrent, conn)
-                       })
-                       err := me.runConnection(torrent, conn)
-                       if err != nil {
-                               log.Print(err)
-                       }
-               }()
+               close(done)
        })
+       <-done
+       defer me.withContext(func() {
+               me.dropConnection(torrent, conn)
+       })
+       err = me.runConnection(torrent, conn)
+       if err != nil {
+               log.Print(err)
+       }
 }
 
 func (me *Client) peerGotPiece(torrent *Torrent, conn *connection, piece int) {
@@ -486,7 +552,6 @@ func (me *Client) peerGotPiece(torrent *Torrent, conn *connection, piece int) {
        }
        conn.PeerPieces[piece] = true
        if torrent.wantPiece(piece) {
-               conn.SetInterested(true)
                me.replenishConnRequests(torrent, conn)
        }
 }
@@ -555,20 +620,7 @@ func (me *Client) runConnection(torrent *Torrent, conn *connection) error {
                                        break
                                }
                                delete(conn.Requests, request_)
-                               if _, ok := torrent.Pieces[request_.Index].PendingChunkSpecs[request_.ChunkSpec]; !ok {
-                                       log.Printf("got unnecessary chunk: %s", request_)
-                                       break
-                               }
-                               err = torrent.WriteChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
-                               if err != nil {
-                                       break
-                               }
-                               delete(torrent.Pieces[request_.Index].PendingChunkSpecs, request_.ChunkSpec)
-                               me.downloadedChunk(torrent, request_)
-                               if len(torrent.Pieces[request_.Index].PendingChunkSpecs) == 0 {
-                                       torrent.Pieces[request_.Index].State = pieceStateUnknown
-                                       go me.verifyPiece(torrent, int(request_.Index))
-                               }
+                               err = me.downloadedChunk(torrent, msg)
                        default:
                                log.Printf("received unknown message type: %#v", msg.Type)
                        }
@@ -669,32 +721,56 @@ func (me *Client) withContext(f func()) {
 }
 
 func (me *Client) replenishConnRequests(torrent *Torrent, conn *connection) {
-       if len(conn.Requests) >= maxRequests {
-               return
-       }
-       if conn.PeerChoked {
+       requestHeatMap := torrent.requestHeat()
+       if torrent.Priorities == nil {
                return
        }
-       requestHeatMap := torrent.requestHeat()
-       for index, has := range conn.PeerPieces {
-               if !has {
+       for e := torrent.Priorities.Front(); e != nil; e = e.Next() {
+               req := e.Value.(Request)
+               if !conn.PeerPieces[req.Index] {
                        continue
                }
-               for chunkSpec, _ := range torrent.Pieces[index].PendingChunkSpecs {
-                       request := Request{peer_protocol.Integer(index), chunkSpec}
-                       if heat := requestHeatMap[request]; heat > 0 {
-                               continue
-                       }
-                       conn.SetInterested(true)
-                       if !conn.Request(request) {
-                               return
-                       }
+               switch torrent.Pieces[req.Index].State {
+               case pieceStateUnknown:
+                       continue
+               case pieceStateIncomplete:
+               default:
+                       panic("prioritized chunk for invalid piece state")
+               }
+               if requestHeatMap[req] > 0 {
+                       continue
+               }
+               if !conn.Request(req) {
+                       break
                }
        }
-       //conn.SetInterested(false)
 }
 
-func (me *Client) downloadedChunk(t *Torrent, chunk Request) {
+func (me *Client) downloadedChunk(torrent *Torrent, msg *peer_protocol.Message) (err error) {
+       request := Request{msg.Index, ChunkSpec{msg.Begin, peer_protocol.Integer(len(msg.Piece))}}
+       if _, ok := torrent.Pieces[request.Index].PendingChunkSpecs[request.ChunkSpec]; !ok {
+               log.Printf("got unnecessary chunk: %s", request)
+               return
+       }
+       err = torrent.WriteChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
+       if err != nil {
+               return
+       }
+       delete(torrent.Pieces[request.Index].PendingChunkSpecs, request.ChunkSpec)
+       if len(torrent.Pieces[request.Index].PendingChunkSpecs) == 0 {
+               torrent.Pieces[request.Index].State = pieceStateUnknown
+               go me.verifyPiece(torrent, request.Index)
+               return
+       }
+       var next *list.Element
+       for e := torrent.Priorities.Front(); e != nil; e = next {
+               next = e.Next()
+               if e.Value.(Request) == request {
+                       torrent.Priorities.Remove(e)
+               }
+       }
+       me.dataReady(DataSpec{torrent.InfoHash, request})
+       return
 }
 
 func (cl *Client) dataReady(ds DataSpec) {
@@ -706,7 +782,7 @@ func (cl *Client) dataReady(ds DataSpec) {
        }()
 }
 
-func (me *Client) pieceHashed(ih InfoHash, piece int, correct bool) {
+func (me *Client) pieceHashed(ih InfoHash, piece peer_protocol.Integer, correct bool) {
        torrent := me.torrents[ih]
        newState := func() pieceState {
                if correct {
@@ -715,16 +791,24 @@ func (me *Client) pieceHashed(ih InfoHash, piece int, correct bool) {
                        return pieceStateIncomplete
                }
        }()
-       oldState := torrent.Pieces[piece].State
-       if newState == oldState {
-               return
-       }
+       // oldState := torrent.Pieces[piece].State
+       // if newState == oldState {
+       //      return
+       // }
        torrent.Pieces[piece].State = newState
        switch newState {
        case pieceStateIncomplete:
-               torrent.Pieces[piece].PendingChunkSpecs = torrent.pieceChunkSpecs(piece)
+               torrent.Pieces[int(piece)].PendingChunkSpecs = torrent.pieceChunkSpecs(piece)
        case pieceStateComplete:
-               log.Print(piece)
+               var next *list.Element
+               if torrent.Priorities != nil {
+                       for e := torrent.Priorities.Front(); e != nil; e = next {
+                               next = e.Next()
+                               if e.Value.(Request).Index == piece {
+                                       torrent.Priorities.Remove(e)
+                               }
+                       }
+               }
                me.dataReady(DataSpec{
                        torrent.InfoHash,
                        Request{
@@ -747,11 +831,14 @@ func (me *Client) pieceHashed(ih InfoHash, piece int, correct bool) {
        }
 }
 
-func (me *Client) verifyPiece(torrent *Torrent, index int) {
+func (me *Client) verifyPiece(torrent *Torrent, index peer_protocol.Integer) {
        sum := torrent.HashPiece(index)
+       done := make(chan struct{})
        me.withContext(func() {
                me.pieceHashed(torrent.InfoHash, index, sum == torrent.Pieces[index].Hash)
+               close(done)
        })
+       <-done
 }
 
 func (me *Client) run() {
@@ -769,7 +856,7 @@ func (me *Client) run() {
                        me.torrents[torrent.InfoHash] = torrent
                        go func() {
                                for index := range torrent.Pieces {
-                                       me.verifyPiece(torrent, index)
+                                       me.verifyPiece(torrent, peer_protocol.Integer(index))
                                }
                        }()
                case infoHash := <-me.torrentFinished:
@@ -791,3 +878,16 @@ func (me *Client) Torrents() (ret []*Torrent) {
        <-done
        return
 }
+
+func (cl *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) {
+       done := make(chan struct{})
+       cl.withContext(func() {
+               t := cl.torrent(ih)
+               t.PrioritizeDataRegion(off, len_)
+               for _, cn := range t.Conns {
+                       cl.replenishConnRequests(t, cn)
+               }
+               close(done)
+       })
+       <-done
+}
index 51cd516b9de0f260508db81770094142c3b78114..8689285eebdac7200fda25f9e19ceaacaabe4722 100644 (file)
@@ -8,13 +8,13 @@ import (
        "github.com/davecheney/profile"
        metainfo "github.com/nsf/libtorgo/torrent"
        "log"
+       "net"
        "net/http"
        _ "net/http/pprof"
        "os"
        "os/user"
        "path/filepath"
        "sync"
-       "syscall"
 )
 
 var (
@@ -48,7 +48,6 @@ type TorrentFS struct {
 func (tfs *TorrentFS) publishData() {
        for {
                spec := <-tfs.Client.DataReady
-               log.Printf("ready data: %s", spec)
                tfs.Lock()
                for ds := range tfs.DataSubs {
                        ds <- spec
@@ -110,11 +109,15 @@ func (fn fileNode) Read(req *fuse.ReadRequest, resp *fuse.ReadResponse, intr fus
                if int64(req.Size) < _len {
                        return req.Size
                } else {
+                       // limit read to the end of the file
                        return int(_len)
                }
        }())
+       infoHash := torrent.BytesInfoHash(fn.metaInfo.InfoHash)
+       torrentOff := fn.TorrentOffset + req.Offset
+       fn.FS.Client.PrioritizeDataRegion(infoHash, torrentOff, int64(len(data)))
        for {
-               n, err := fn.FS.Client.TorrentReadAt(torrent.BytesInfoHash(fn.metaInfo.InfoHash), fn.TorrentOffset+req.Offset, data)
+               n, err := fn.FS.Client.TorrentReadAt(infoHash, torrentOff, data)
                switch err {
                case nil:
                        resp.Data = data[:n]
@@ -123,7 +126,7 @@ func (fn fileNode) Read(req *fuse.ReadRequest, resp *fuse.ReadResponse, intr fus
                        select {
                        case <-dataSpecs:
                        case <-intr:
-                               return fuse.Errno(syscall.EINTR)
+                               return fuse.EINTR
                        }
                default:
                        log.Print(err)
@@ -270,6 +273,7 @@ func (tfs *TorrentFS) Root() (fusefs.Node, fuse.Error) {
 func main() {
        pprofAddr := flag.String("pprofAddr", "", "pprof HTTP server bind address")
        flag.Parse()
+       log.SetFlags(log.LstdFlags | log.Lshortfile)
        if *pprofAddr != "" {
                go http.ListenAndServe(*pprofAddr, nil)
        }
@@ -297,6 +301,10 @@ func main() {
                if err != nil {
                        log.Print(err)
                }
+               client.AddPeers(torrent.BytesInfoHash(metaInfo.InfoHash), []torrent.Peer{{
+                       IP:   net.IPv4(127, 0, 0, 1),
+                       Port: 3000,
+               }})
        }
        conn, err := fuse.Mount(mountDir)
        if err != nil {