]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Got a nice working algorithm for responsive download strategy
authorMatt Joiner <anacrolix@gmail.com>
Sat, 23 Aug 2014 17:08:11 +0000 (03:08 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Sat, 23 Aug 2014 17:08:11 +0000 (03:08 +1000)
client.go
cmd/torrentfs/main.go
download_strategies.go

index b1dc099e0236aa9b92039692d0dbf25e20432bfe..7a11c24a79371eb18a7f9a0295baf0f220f02643 100644 (file)
--- a/client.go
+++ b/client.go
@@ -1228,6 +1228,7 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
        // Record that we have the chunk.
        delete(t.Pieces[req.Index].PendingChunkSpecs, req.chunkSpec)
        t.PiecesByBytesLeft.ValueChanged(t.Pieces[req.Index].bytesLeftElement)
+       me.dataReady(dataSpec{t.InfoHash, req})
        if len(t.Pieces[req.Index].PendingChunkSpecs) == 0 {
                me.queuePieceCheck(t, req.Index)
        }
@@ -1236,18 +1237,14 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
        me.downloadStrategy.TorrentGotChunk(t, req)
 
        // Cancel pending requests for this chunk.
-       cancelled := false
        for _, c := range t.Conns {
                if me.connCancel(t, c, req) {
-                       cancelled = true
                        me.replenishConnRequests(t, c)
                }
        }
-       if cancelled {
-               log.Printf("cancelled concurrent requests for %v", req)
-       }
 
-       me.dataReady(dataSpec{t.InfoHash, req})
+       me.downloadStrategy.AssertNotRequested(t, req)
+
        return nil
 }
 
index 1b3d9797639aaab0875fa192c9533a3d9c0dedff..d542c7172e81f03c94ac39404545d99db1a0a4bc 100644 (file)
@@ -29,7 +29,7 @@ var (
        disableTrackers = flag.Bool("disableTrackers", false, "disables trackers")
        testPeer        = flag.String("testPeer", "", "the address for a test peer")
        httpAddr        = flag.String("httpAddr", "localhost:0", "HTTP server bind address")
-       readaheadBytes  = flag.Int("readaheadBytes", 10*1024*1024, "bytes to readahead in each torrent from the last read piece")
+       readaheadBytes  = flag.Int64("readaheadBytes", 10*1024*1024, "bytes to readahead in each torrent from the last read piece")
        testPeerAddr    *net.TCPAddr
        listenAddr      = flag.String("listenAddr", ":6882", "incoming connection address")
 )
index 476f607c734f5685acd332a26c7d2a6971bff771..db31bc3aec46599e12fe7b5764c8c90b76901867 100644 (file)
@@ -16,12 +16,19 @@ type DownloadStrategy interface {
        TorrentGotChunk(t *torrent, r request)
        TorrentGotPiece(t *torrent, piece int)
        WriteStatus(w io.Writer)
+       AssertNotRequested(*torrent, request)
 }
 
 type DefaultDownloadStrategy struct {
        heat map[*torrent]map[request]int
 }
 
+func (me *DefaultDownloadStrategy) AssertNotRequested(t *torrent, r request) {
+       if me.heat[t][r] != 0 {
+               panic("outstanding requests break invariant")
+       }
+}
+
 func (me *DefaultDownloadStrategy) WriteStatus(w io.Writer) {}
 
 func (s *DefaultDownloadStrategy) FillRequests(t *torrent, c *connection) {
@@ -100,20 +107,22 @@ func (me *DefaultDownloadStrategy) TorrentGotChunk(t *torrent, c request)      {
 func (me *DefaultDownloadStrategy) TorrentGotPiece(t *torrent, piece int)      {}
 func (*DefaultDownloadStrategy) TorrentPrioritize(t *torrent, off, _len int64) {}
 
-func NewResponsiveDownloadStrategy(readahead int) *responsiveDownloadStrategy {
+func NewResponsiveDownloadStrategy(readahead int64) *responsiveDownloadStrategy {
        return &responsiveDownloadStrategy{
-               Readahead:     readahead,
-               lastReadPiece: make(map[*torrent]int),
-               priorities:    make(map[*torrent]map[request]struct{}),
+               Readahead:      readahead,
+               lastReadOffset: make(map[*torrent]int64),
+               priorities:     make(map[*torrent]map[request]struct{}),
+               requestHeat:    make(map[*torrent]map[request]int),
        }
 }
 
 type responsiveDownloadStrategy struct {
        // How many bytes to preemptively download starting at the beginning of
        // the last piece read for a given torrent.
-       Readahead     int
-       lastReadPiece map[*torrent]int
-       priorities    map[*torrent]map[request]struct{}
+       Readahead      int64
+       lastReadOffset map[*torrent]int64
+       priorities     map[*torrent]map[request]struct{}
+       requestHeat    map[*torrent]map[request]int
 }
 
 func (me *responsiveDownloadStrategy) WriteStatus(w io.Writer) {
@@ -129,69 +138,69 @@ func (me *responsiveDownloadStrategy) WriteStatus(w io.Writer) {
 
 func (me *responsiveDownloadStrategy) TorrentStarted(t *torrent) {
        me.priorities[t] = make(map[request]struct{})
+       me.requestHeat[t] = make(map[request]int)
 }
 
 func (me *responsiveDownloadStrategy) TorrentStopped(t *torrent) {
-       delete(me.lastReadPiece, t)
+       delete(me.lastReadOffset, t)
        delete(me.priorities, t)
 }
-func (responsiveDownloadStrategy) DeleteRequest(*torrent, request) {}
+func (me *responsiveDownloadStrategy) DeleteRequest(t *torrent, r request) {
+       rh := me.requestHeat[t]
+       if rh[r] <= 0 {
+               panic("request heat invariant broken")
+       }
+       rh[r]--
+}
 
 func (me *responsiveDownloadStrategy) FillRequests(t *torrent, c *connection) {
+       requestWrapper := func(req request) bool {
+               if c.RequestPending(req) {
+                       return true
+               }
+               again := c.Request(req)
+               if c.RequestPending(req) {
+                       me.requestHeat[t][req]++
+               }
+               return again
+       }
+
        for req := range me.priorities[t] {
                if _, ok := t.Pieces[req.Index].PendingChunkSpecs[req.chunkSpec]; !ok {
                        panic(req)
                }
-               if !c.Request(req) {
+               if !requestWrapper(req) {
                        return
                }
        }
 
-       if len(c.Requests) >= 32 {
+       if len(c.Requests) >= 16 {
                return
        }
 
-       // Short circuit request fills at a level that might reduce receiving of
-       // unnecessary chunks.
-       requestWrapper := func(r request) bool {
-               if len(c.Requests) >= 64 {
-                       return false
-               }
-               return c.Request(r)
-       }
-
-       requestPiece := func(piece int) bool {
-               if piece >= t.NumPieces() {
-                       return true
-               }
-               for _, cs := range t.Pieces[piece].shuffledPendingChunkSpecs() {
-                       if !requestWrapper(request{pp.Integer(piece), cs}) {
+       requestWrapper = func() func(request) bool {
+               f := requestWrapper
+               return func(req request) bool {
+                       if len(c.Requests) >= 32 {
                                return false
                        }
+                       return f(req)
                }
-               return true
-       }
+       }()
 
-       if lastReadPiece, ok := me.lastReadPiece[t]; ok {
-               readaheadPieces := (me.Readahead + t.UsualPieceSize() - 1) / t.UsualPieceSize()
-               for i := 0; i < readaheadPieces; i++ {
-                       if !requestPiece(lastReadPiece + i) {
-                               return
+       if lastReadOffset, ok := me.lastReadOffset[t]; ok {
+               for off := lastReadOffset; off < lastReadOffset+chunkSize-1+me.Readahead; off += chunkSize {
+                       req, ok := t.offsetRequest(off)
+                       if !ok {
+                               break
                        }
-               }
-       }
-
-       // Then finish off incomplete pieces in order of bytes remaining.
-       for e := t.PiecesByBytesLeft.Front(); e != nil; e = e.Next() {
-               index := e.Value.(int)
-               // Stop when we're onto untouched pieces.
-               if !t.PiecePartiallyDownloaded(index) {
-                       break
-               }
-               // Request chunks in random order to reduce overlap with other
-               // connections.
-               for _, cs := range t.Pieces[index].shuffledPendingChunkSpecs() {
-                       if !requestWrapper(request{pp.Integer(index), cs}) {
+                       if me.requestHeat[t][req] >= 2 {
+                               continue
+                       }
+                       if !t.wantChunk(req) {
+                               continue
+                       }
+                       if !requestWrapper(req) {
                                return
                        }
                }
@@ -209,7 +218,7 @@ func (me *responsiveDownloadStrategy) TorrentGotPiece(t *torrent, piece int) {
 }
 
 func (s *responsiveDownloadStrategy) TorrentPrioritize(t *torrent, off, _len int64) {
-       s.lastReadPiece[t] = int(off / int64(t.UsualPieceSize()))
+       s.lastReadOffset[t] = off
        for _len > 0 {
                req, ok := t.offsetRequest(off)
                if !ok {
@@ -226,3 +235,9 @@ func (s *responsiveDownloadStrategy) TorrentPrioritize(t *torrent, off, _len int
                }
        }
 }
+
+func (s *responsiveDownloadStrategy) AssertNotRequested(t *torrent, r request) {
+       if s.requestHeat[t][r] != 0 {
+               panic("outstanding requests invariant broken")
+       }
+}