]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Add Client.DownloadStrategy; fix priority of incomplete pieces
authorMatt Joiner <anacrolix@gmail.com>
Wed, 21 May 2014 07:40:54 +0000 (17:40 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Wed, 21 May 2014 07:40:54 +0000 (17:40 +1000)
client.go
cmd/torrentfs/main.go
fs/torrentfs_test.go
torrent.go

index 3e0277876bb8e27cbab27867c35c2cb59ae927ce..2ae3b8321858160b9e7a460d119d5d5f4b698bb7 100644 (file)
--- a/client.go
+++ b/client.go
@@ -94,11 +94,12 @@ type dataSpec struct {
 }
 
 type Client struct {
-       DataDir         string
-       HalfOpenLimit   int
-       PeerId          [20]byte
-       Listener        net.Listener
-       DisableTrackers bool
+       DataDir          string
+       HalfOpenLimit    int
+       PeerId           [20]byte
+       Listener         net.Listener
+       DisableTrackers  bool
+       DownloadStrategy DownloadStrategy
 
        mu    sync.Mutex
        event sync.Cond
@@ -173,6 +174,9 @@ func (c *Client) Start() {
                panic("error generating peer id")
        }
        c.quit = make(chan struct{})
+       if c.DownloadStrategy == nil {
+               c.DownloadStrategy = DefaultDownloadStrategy{}
+       }
        if c.Listener != nil {
                go c.acceptConnections()
        }
@@ -672,7 +676,13 @@ func (me *Client) WaitAll() bool {
        return true
 }
 
-func (me *Client) replenishConnRequests(torrent *torrent, conn *connection) {
+type DownloadStrategy interface {
+       FillRequests(t *torrent, c *connection)
+}
+
+type DefaultDownloadStrategy struct{}
+
+func (DefaultDownloadStrategy) FillRequests(torrent *torrent, conn *connection) {
        requestHeatMap := torrent.requestHeat()
        addRequest := func(req request) (again bool) {
                piece := torrent.Pieces[req.Index]
@@ -697,44 +707,75 @@ func (me *Client) replenishConnRequests(torrent *torrent, conn *connection) {
                }
        }
        // Then finish off incomplete pieces in order of bytes remaining.
-       for _, index := range torrent.piecesByPendingBytesDesc() {
-               if torrent.PieceNumPendingBytes(index) == torrent.PieceLength(index) {
-                       continue
-               }
+       for _, index := range torrent.piecesByPendingBytes() {
                for chunkSpec := range torrent.Pieces[index].PendingChunkSpecs {
                        if !addRequest(request{index, chunkSpec}) {
                                return
                        }
                }
        }
-       if len(conn.Requests) == 0 {
-               conn.SetInterested(false)
+}
+
+type ResponsiveDownloadStrategy struct{}
+
+func (ResponsiveDownloadStrategy) FillRequests(t *torrent, c *connection) {
+       for e := t.Priorities.Front(); e != nil; e = e.Next() {
+               if !c.Request(e.Value.(request)) {
+                       break
+               }
        }
 }
 
-func (me *Client) downloadedChunk(torrent *torrent, msg *peer_protocol.Message) (err error) {
-       req := request{msg.Index, chunkSpec{msg.Begin, peer_protocol.Integer(len(msg.Piece))}}
-       if _, ok := torrent.Pieces[req.Index].PendingChunkSpecs[req.chunkSpec]; !ok {
-               log.Printf("got unnecessary chunk: %s", req)
-               return
+func (me *Client) replenishConnRequests(t *torrent, c *connection) {
+       me.DownloadStrategy.FillRequests(t, c)
+       if len(c.Requests) == 0 {
+               c.SetInterested(false)
        }
-       err = torrent.WriteChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
+}
+
+func (me *Client) downloadedChunk(t *torrent, c *connection, msg *peer_protocol.Message) error {
+       req := newRequest(msg.Index, msg.Begin, peer_protocol.Integer(len(msg.Piece)))
+
+       // Request has been satisfied.
+       delete(c.Requests, req)
+
+       defer me.replenishConnRequests(t, c)
+
+       // Do we actually want this chunk?
+       if _, ok := t.Pieces[req.Index].PendingChunkSpecs[req.chunkSpec]; !ok {
+               return nil
+       }
+
+       // Write the chunk out.
+       err := t.WriteChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
        if err != nil {
-               return
+               return err
+       }
+       me.dataReady(dataSpec{t.InfoHash, req})
+
+       // Cancel pending requests for this chunk.
+       for _, c := range t.Conns {
+               if c.Cancel(req) {
+                       me.replenishConnRequests(t, c)
+               }
        }
-       delete(torrent.Pieces[req.Index].PendingChunkSpecs, req.chunkSpec)
-       if len(torrent.Pieces[req.Index].PendingChunkSpecs) == 0 {
-               me.queuePieceCheck(torrent, req.Index)
+
+       // Record that we have the chunk.
+       delete(t.Pieces[req.Index].PendingChunkSpecs, req.chunkSpec)
+       if len(t.Pieces[req.Index].PendingChunkSpecs) == 0 {
+               me.queuePieceCheck(t, req.Index)
        }
+
+       // Unprioritize the chunk.
        var next *list.Element
-       for e := torrent.Priorities.Front(); e != nil; e = next {
+       for e := t.Priorities.Front(); e != nil; e = next {
                next = e.Next()
                if e.Value.(request) == req {
-                       torrent.Priorities.Remove(e)
+                       t.Priorities.Remove(e)
                }
        }
-       me.dataReady(dataSpec{torrent.InfoHash, req})
-       return
+
+       return nil
 }
 
 func (cl *Client) dataReady(ds dataSpec) {
index 5233b22b04bf014c2ae67e62bb7487919658e4f3..8ffeee73719b94ba5a4d1b748b26ab56c41649bf 100644 (file)
@@ -131,8 +131,9 @@ func main() {
        setSignalHandlers()
        defer conn.Close()
        client := &torrent.Client{
-               DataDir:         downloadDir,
-               DisableTrackers: *disableTrackers,
+               DataDir:          downloadDir,
+               DisableTrackers:  *disableTrackers,
+               DownloadStrategy: torrent.ResponsiveDownloadStrategy{},
        }
        client.Start()
        addTorrentDir(client, torrentPath)
index a1ae3f8295035528b3bb19cdd270eb42261826ef..47662c2502144f4f829e94714489bad476a93b29 100644 (file)
@@ -138,7 +138,8 @@ func TestDownloadOnDemand(t *testing.T) {
        defer seeder.Stop()
        seeder.AddTorrent(layout.Metainfo)
        leecher := torrent.Client{
-               DataDir: filepath.Join(layout.BaseDir, "download"),
+               DataDir:          filepath.Join(layout.BaseDir, "download"),
+               DownloadStrategy: torrent.ResponsiveDownloadStrategy{},
        }
        leecher.Start()
        defer leecher.Stop()
index c747986e81558f228fa52bed95631d21099ffeb0..08bce0b8bda99c68e2dc28bb161db806b6e59831 100644 (file)
@@ -66,7 +66,7 @@ func (t *torrent) Close() (err error) {
        return
 }
 
-func (t *torrent) piecesByPendingBytesDesc() (indices []peer_protocol.Integer) {
+func (t *torrent) piecesByPendingBytes() (indices []peer_protocol.Integer) {
        slice := pieceByBytesPendingSlice{
                Pending: make([]peer_protocol.Integer, 0, len(t.Pieces)),
                Indices: make([]peer_protocol.Integer, 0, len(t.Pieces)),