]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Improve incomplete piece ordering, responsive download strategy
authorMatt Joiner <anacrolix@gmail.com>
Sun, 24 Aug 2014 19:31:34 +0000 (05:31 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Sun, 24 Aug 2014 19:31:34 +0000 (05:31 +1000)
client.go
download_strategies.go
ordered.go
torrent.go

index a513a0aee583df64d4b2c0ae77ff1ae676d8d84f..dc8629699ce8b0ba9a266ba4331a189d225fe5c3 100644 (file)
--- a/client.go
+++ b/client.go
@@ -1241,11 +1241,11 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
 
        // Record that we have the chunk.
        delete(t.Pieces[req.Index].PendingChunkSpecs, req.chunkSpec)
-       t.PiecesByBytesLeft.ValueChanged(t.Pieces[req.Index].bytesLeftElement)
        me.dataReady(dataSpec{t.InfoHash, req})
        if len(t.Pieces[req.Index].PendingChunkSpecs) == 0 {
                me.queuePieceCheck(t, req.Index)
        }
+       t.PieceBytesLeftChanged(int(req.Index))
 
        // Unprioritize the chunk.
        me.downloadStrategy.TorrentGotChunk(t, req)
@@ -1301,6 +1301,7 @@ func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) {
                        t.pendAllChunkSpecs(piece)
                }
        }
+       t.PieceBytesLeftChanged(int(piece))
        for _, conn := range t.Conns {
                if correct {
                        conn.Post(pp.Message{
index db31bc3aec46599e12fe7b5764c8c90b76901867..1de19a5b5682fbe43a13c11b5b60126ee5717a10 100644 (file)
@@ -62,7 +62,7 @@ func (s *DefaultDownloadStrategy) FillRequests(t *torrent, c *connection) {
        }
        // Then finish off incomplete pieces in order of bytes remaining.
        for _, heatThreshold := range []int{1, 4, 15, 60} {
-               for e := t.PiecesByBytesLeft.Front(); e != nil; e = e.Next() {
+               for e := t.IncompletePiecesByBytesLeft.Front(); e != nil; e = e.Next() {
                        pieceIndex := pp.Integer(e.Value.(int))
                        for _, chunkSpec := range t.Pieces[pieceIndex].shuffledPendingChunkSpecs() {
                                // for chunkSpec := range t.Pieces[pieceIndex].PendingChunkSpecs {
@@ -154,13 +154,14 @@ func (me *responsiveDownloadStrategy) DeleteRequest(t *torrent, r request) {
 }
 
 func (me *responsiveDownloadStrategy) FillRequests(t *torrent, c *connection) {
+       th := me.requestHeat[t]
        requestWrapper := func(req request) bool {
                if c.RequestPending(req) {
                        return true
                }
                again := c.Request(req)
                if c.RequestPending(req) {
-                       me.requestHeat[t][req]++
+                       th[req]++
                }
                return again
        }
@@ -189,22 +190,56 @@ func (me *responsiveDownloadStrategy) FillRequests(t *torrent, c *connection) {
        }()
 
        if lastReadOffset, ok := me.lastReadOffset[t]; ok {
-               for off := lastReadOffset; off < lastReadOffset+chunkSize-1+me.Readahead; off += chunkSize {
+               var nextAhead int64
+               for ahead := int64(0); ahead < me.Readahead; ahead = nextAhead {
+                       off := lastReadOffset + ahead
                        req, ok := t.offsetRequest(off)
                        if !ok {
                                break
                        }
-                       if me.requestHeat[t][req] >= 2 {
+                       if !t.wantPiece(int(req.Index)) {
+                               nextAhead = ahead + int64(t.PieceLength(req.Index))
                                continue
                        }
+                       nextAhead = ahead + int64(req.Length)
                        if !t.wantChunk(req) {
                                continue
                        }
+                       if th[req] >= func() int {
+                               // Determine allowed redundancy based on how far into the
+                               // readahead zone we're looking.
+                               if ahead >= (2*me.Readahead+2)/3 {
+                                       return 1
+                               } else if ahead >= (me.Readahead+2)/3 {
+                                       return 2
+                               } else {
+                                       return 3
+                               }
+                       }() {
+                               continue
+                       }
                        if !requestWrapper(req) {
                                return
                        }
                }
        }
+
+       // t.assertIncompletePiecesByBytesLeftOrdering()
+       for e := t.IncompletePiecesByBytesLeft.Front(); e != nil; e = e.Next() {
+               p := e.Value.(int)
+               if !t.PiecePartiallyDownloaded(p) && int(t.PieceLength(pp.Integer(p))) == t.UsualPieceSize() {
+                       break
+               }
+               for chunkSpec := range t.Pieces[p].PendingChunkSpecs {
+                       r := request{pp.Integer(p), chunkSpec}
+                       if th[r] >= 2 {
+                               continue
+                       }
+                       if !requestWrapper(r) {
+                               return
+                       }
+               }
+       }
 }
 
 func (me *responsiveDownloadStrategy) TorrentGotChunk(t *torrent, req request) {
index 6a97da34bc0108e1df5936f21f8b84417058f0cd..9c5460545acef6f031d305c741d005db378201f1 100644 (file)
@@ -38,3 +38,7 @@ func (me *OrderedList) Insert(value interface{}) (ret *list.Element) {
 func (me *OrderedList) Front() *list.Element {
        return me.list.Front()
 }
+
+func (me *OrderedList) Remove(e *list.Element) interface{} {
+       return me.list.Remove(e)
+}
index 792ce20f201f4dfe47c9ba56d86e7d264998118a..07a18c4878883a11f387496555f28ec051d88972 100644 (file)
@@ -44,17 +44,18 @@ type peersKey struct {
 }
 
 type torrent struct {
-       closed            bool
-       InfoHash          InfoHash
-       Pieces            []*torrentPiece
-       PiecesByBytesLeft *OrderedList
-       Data              mmap_span.MMapSpan
-       length            int64
+       closing                     chan struct{}
+       InfoHash                    InfoHash
+       Pieces                      []*torrentPiece
+       IncompletePiecesByBytesLeft *OrderedList
+       length                      int64
        // Prevent mutations to Data memory maps while in use as they're not safe.
        dataLock sync.RWMutex
-       Info     *metainfo.Info
-       Conns    []*connection
-       Peers    map[peersKey]Peer
+       Data     mmap_span.MMapSpan
+
+       Info  *metainfo.Info
+       Conns []*connection
+       Peers map[peersKey]Peer
        // BEP 12 Multitracker Metadata Extension. The tracker.Client instances
        // mirror their respective URLs from the announce-list key.
        Trackers     [][]tracker.Client
@@ -63,6 +64,34 @@ type torrent struct {
        metadataHave []bool
 }
 
+func (t *torrent) assertIncompletePiecesByBytesLeftOrdering() {
+       allIndexes := make(map[int]struct{}, t.NumPieces())
+       for i := 0; i < t.NumPieces(); i++ {
+               allIndexes[i] = struct{}{}
+       }
+       var lastBytesLeft int
+       for e := t.IncompletePiecesByBytesLeft.Front(); e != nil; e = e.Next() {
+               i := e.Value.(int)
+               if _, ok := allIndexes[i]; !ok {
+                       panic("duplicate entry")
+               }
+               delete(allIndexes, i)
+               if t.Pieces[i].Complete() {
+                       panic("complete piece")
+               }
+               bytesLeft := int(t.PieceNumPendingBytes(pp.Integer(i)))
+               if bytesLeft < lastBytesLeft {
+                       panic("ordering broken")
+               }
+               lastBytesLeft = bytesLeft
+       }
+       for i := range allIndexes {
+               if !t.Pieces[i].Complete() {
+                       panic("leaked incomplete piece")
+               }
+       }
+}
+
 func (t *torrent) AddPeers(pp []Peer) {
        for _, p := range pp {
                t.Peers[peersKey{string(p.IP), p.Port}] = p
@@ -124,7 +153,7 @@ func (t *torrent) setMetadata(md metainfo.Info, dataDir string, infoBytes []byte
                return
        }
        t.length = t.Data.Size()
-       t.PiecesByBytesLeft = NewList(func(a, b interface{}) bool {
+       t.IncompletePiecesByBytesLeft = NewList(func(a, b interface{}) bool {
                apb := t.PieceNumPendingBytes(pp.Integer(a.(int)))
                bpb := t.PieceNumPendingBytes(pp.Integer(b.(int)))
                if apb < bpb {
@@ -139,9 +168,10 @@ func (t *torrent) setMetadata(md metainfo.Info, dataDir string, infoBytes []byte
                piece := &torrentPiece{}
                util.CopyExact(piece.Hash[:], hash)
                t.Pieces = append(t.Pieces, piece)
-               piece.bytesLeftElement = t.PiecesByBytesLeft.Insert(index)
+               piece.bytesLeftElement = t.IncompletePiecesByBytesLeft.Insert(index)
                t.pendAllChunkSpecs(pp.Integer(index))
        }
+       t.assertIncompletePiecesByBytesLeftOrdering()
        for _, conn := range t.Conns {
                if err := conn.setNumPieces(t.NumPieces()); err != nil {
                        log.Printf("closing connection: %s", err)
@@ -391,10 +421,19 @@ func (t *torrent) pendAllChunkSpecs(index pp.Integer) {
        for _, cs := range t.pieceChunks(int(index)) {
                pcss[cs] = struct{}{}
        }
-       t.PiecesByBytesLeft.ValueChanged(piece.bytesLeftElement)
+       t.IncompletePiecesByBytesLeft.ValueChanged(piece.bytesLeftElement)
        return
 }
 
+func (t *torrent) PieceBytesLeftChanged(index int) {
+       p := t.Pieces[index]
+       if p.Complete() {
+               t.IncompletePiecesByBytesLeft.Remove(p.bytesLeftElement)
+       } else {
+               t.IncompletePiecesByBytesLeft.ValueChanged(p.bytesLeftElement)
+       }
+}
+
 type Peer struct {
        Id     [20]byte
        IP     net.IP