]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Always finish off partially downloaded pieces
authorMatt Joiner <anacrolix@gmail.com>
Tue, 15 Oct 2013 08:42:30 +0000 (19:42 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Tue, 15 Oct 2013 08:42:30 +0000 (19:42 +1100)
client.go
cmd/torrentfs/main.go

index 5509da6f03bfbd4ceb203dc7ae9cdcc0ecb7f740..f6d07a7b58c40b86a3afa439e05917a00fe2e783 100644 (file)
--- a/client.go
+++ b/client.go
@@ -16,6 +16,7 @@ import (
        "net"
        "os"
        "path/filepath"
+       "sort"
 )
 
 const (
@@ -56,6 +57,13 @@ type piece struct {
        PendingChunkSpecs map[ChunkSpec]struct{}
 }
 
+func (p piece) NumPendingBytes() (count peer_protocol.Integer) {
+       for cs, _ := range p.PendingChunkSpecs {
+               count += cs.Length
+       }
+       return
+}
+
 type ChunkSpec struct {
        Begin, Length peer_protocol.Integer
 }
@@ -192,6 +200,35 @@ type Torrent struct {
        Priorities *list.List
 }
 
+type pieceByBytesPendingSlice struct {
+       Torrent *Torrent
+       Indices []peer_protocol.Integer
+}
+
+func (pcs pieceByBytesPendingSlice) Len() int {
+       return len(pcs.Indices)
+}
+
+func (me pieceByBytesPendingSlice) Less(i, j int) bool {
+       return me.Torrent.Pieces[i].NumPendingBytes() < me.Torrent.Pieces[j].NumPendingBytes()
+}
+
+func (me pieceByBytesPendingSlice) Swap(i, j int) {
+       me.Indices[i], me.Indices[j] = me.Indices[j], me.Indices[i]
+}
+
+func (t *Torrent) piecesByPendingBytesDesc() (indices []peer_protocol.Integer) {
+       slice := pieceByBytesPendingSlice{
+               Torrent: t,
+               Indices: make([]peer_protocol.Integer, 0, len(t.Pieces)),
+       }
+       for i := range t.Pieces {
+               slice.Indices = append(slice.Indices, peer_protocol.Integer(i))
+       }
+       sort.Sort(sort.Reverse(slice))
+       return slice.Indices
+}
+
 func (t *Torrent) PrioritizeDataRegion(off, len_ int64) {
        newPriorities := make([]Request, 0, (len_+2*(chunkSize-1))/chunkSize)
        for len_ > 0 {
@@ -363,7 +400,6 @@ func (cl *Client) TorrentReadAt(ih InfoHash, off int64, p []byte) (n int, err er
                n, err = t.Data.ReadAt(p, off)
        })
        <-done
-       log.Println(n, err)
        return
 }
 
@@ -460,7 +496,10 @@ func (me *Client) initiateConn(peer Peer, torrent *Torrent) {
                        return
                }
                log.Printf("connected to %s", conn.RemoteAddr())
-               me.handshake(conn, torrent, peer.Id)
+               err = me.handshake(conn, torrent, peer.Id)
+               if err != nil {
+                       log.Print(err)
+               }
        }()
 }
 
@@ -473,7 +512,7 @@ func (me *Torrent) haveAnyPieces() bool {
        return false
 }
 
-func (me *Client) handshake(sock net.Conn, torrent *Torrent, peerId [20]byte) {
+func (me *Client) handshake(sock net.Conn, torrent *Torrent, peerId [20]byte) (err error) {
        conn := &connection{
                Socket:     sock,
                Choked:     true,
@@ -491,7 +530,7 @@ func (me *Client) handshake(sock net.Conn, torrent *Torrent, peerId [20]byte) {
                conn.post <- peer_protocol.Bytes(me.PeerId[:])
        }
        var b [28]byte
-       _, err := io.ReadFull(conn.Socket, b[:])
+       _, err = io.ReadFull(conn.Socket, b[:])
        switch err {
        case nil:
        case io.EOF:
@@ -541,9 +580,7 @@ func (me *Client) handshake(sock net.Conn, torrent *Torrent, peerId [20]byte) {
                me.dropConnection(torrent, conn)
        })
        err = me.runConnection(torrent, conn)
-       if err != nil {
-               log.Print(err)
-       }
+       return
 }
 
 func (me *Client) peerGotPiece(torrent *Torrent, conn *connection, piece int) {
@@ -584,6 +621,7 @@ func (me *Client) runConnection(torrent *Torrent, conn *connection) error {
                        switch msg.Type {
                        case peer_protocol.Choke:
                                conn.PeerChoked = true
+                               conn.Requests = nil
                        case peer_protocol.Unchoke:
                                conn.PeerChoked = false
                                me.peerUnchoked(torrent, conn)
@@ -722,26 +760,33 @@ func (me *Client) withContext(f func()) {
 
 func (me *Client) replenishConnRequests(torrent *Torrent, conn *connection) {
        requestHeatMap := torrent.requestHeat()
-       if torrent.Priorities == nil {
-               return
-       }
-       for e := torrent.Priorities.Front(); e != nil; e = e.Next() {
-               req := e.Value.(Request)
+       addRequest := func(req Request) (again bool) {
                if !conn.PeerPieces[req.Index] {
-                       continue
+                       return true
                }
-               switch torrent.Pieces[req.Index].State {
-               case pieceStateUnknown:
-                       continue
-               case pieceStateIncomplete:
-               default:
-                       panic("prioritized chunk for invalid piece state")
+               if torrent.Pieces[req.Index].State != pieceStateIncomplete {
+                       return true
                }
                if requestHeatMap[req] > 0 {
+                       return true
+               }
+               return conn.Request(req)
+       }
+       if torrent.Priorities != nil {
+               for e := torrent.Priorities.Front(); e != nil; e = e.Next() {
+                       if !addRequest(e.Value.(Request)) {
+                               return
+                       }
+               }
+       }
+       for _, index := range torrent.piecesByPendingBytesDesc() {
+               if torrent.Pieces[index].NumPendingBytes() == torrent.PieceLength(index) {
                        continue
                }
-               if !conn.Request(req) {
-                       break
+               for chunkSpec := range torrent.Pieces[index].PendingChunkSpecs {
+                       if !addRequest(Request{index, chunkSpec}) {
+                               return
+                       }
                }
        }
 }
@@ -752,6 +797,7 @@ func (me *Client) downloadedChunk(torrent *Torrent, msg *peer_protocol.Message)
                log.Printf("got unnecessary chunk: %s", request)
                return
        }
+       log.Printf("got chunk %s", request)
        err = torrent.WriteChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
        if err != nil {
                return
@@ -816,6 +862,7 @@ func (me *Client) pieceHashed(ih InfoHash, piece peer_protocol.Integer, correct
                                ChunkSpec{0, peer_protocol.Integer(torrent.PieceLength(piece))},
                        },
                })
+               torrent.Pieces[piece].PendingChunkSpecs = nil
        }
        for _, conn := range torrent.Conns {
                if correct {
index 8689285eebdac7200fda25f9e19ceaacaabe4722..9ea5ab5bcf18e491fccc58cb461dd2b9a71f156f 100644 (file)
@@ -118,6 +118,7 @@ func (fn fileNode) Read(req *fuse.ReadRequest, resp *fuse.ReadResponse, intr fus
        fn.FS.Client.PrioritizeDataRegion(infoHash, torrentOff, int64(len(data)))
        for {
                n, err := fn.FS.Client.TorrentReadAt(infoHash, torrentOff, data)
+               // log.Println(torrentOff, len(data), n, err)
                switch err {
                case nil:
                        resp.Data = data[:n]