]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Keep pieces sorted by bytes left
authorMatt Joiner <anacrolix@gmail.com>
Wed, 9 Jul 2014 14:26:58 +0000 (00:26 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Wed, 9 Jul 2014 14:26:58 +0000 (00:26 +1000)
client.go
download_strategies.go
ordered.go [new file with mode: 0644]
ordered_test.go [new file with mode: 0644]
torrent.go

index 7ab5ca6ab8e6141acc479912a56aa11ca7df967f..1b6f89e8912dc8dd200a65b18b198204c5265faa 100644 (file)
--- a/client.go
+++ b/client.go
@@ -1024,6 +1024,7 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
 
        // Record that we have the chunk.
        delete(t.Pieces[req.Index].PendingChunkSpecs, req.chunkSpec)
+       t.PiecesByBytesLeft.ValueChanged(t.Pieces[req.Index].bytesLeftElement)
        if len(t.Pieces[req.Index].PendingChunkSpecs) == 0 {
                me.queuePieceCheck(t, req.Index)
        }
index 8be1be40c5ff38e8647b16a87af4bb989d9c2690..c6d0e1e1afa629e691032ed0953528d22413a5e3 100644 (file)
@@ -50,16 +50,17 @@ func (s *DefaultDownloadStrategy) FillRequests(t *torrent, c *connection) {
                        return
                }
        }
-       ppbs := t.piecesByPendingBytes()
        // Then finish off incomplete pieces in order of bytes remaining.
        for _, heatThreshold := range []int{0, 4, 60} {
-               for _, pieceIndex := range ppbs {
-                       for _, chunkSpec := range t.Pieces[pieceIndex].shuffledPendingChunkSpecs() {
+               for e := t.PiecesByBytesLeft.Front(); e != nil; e = e.Next() {
+                       pieceIndex := pp.Integer(e.Value.(int))
+                       // for _, chunkSpec := range t.Pieces[pieceIndex].shuffledPendingChunkSpecs() {
+                       for chunkSpec := range t.Pieces[pieceIndex].PendingChunkSpecs {
                                r := request{pieceIndex, chunkSpec}
                                if th[r] > heatThreshold {
                                        continue
                                }
-                               if !addRequest(request{pieceIndex, chunkSpec}) {
+                               if !addRequest(r) {
                                        return
                                }
                        }
@@ -121,15 +122,16 @@ func (me *ResponsiveDownloadStrategy) FillRequests(t *torrent, c *connection) {
                }
        }
        // Then finish off incomplete pieces in order of bytes remaining.
-       for _, index := range t.piecesByPendingBytes() {
+       for e := t.PiecesByBytesLeft.Front(); e != nil; e = e.Next() {
+               index := e.Value.(int)
                // Stop when we're onto untouched pieces.
-               if !t.PiecePartiallyDownloaded(int(index)) {
+               if !t.PiecePartiallyDownloaded(index) {
                        break
                }
                // Request chunks in random order to reduce overlap with other
                // connections.
                for _, cs := range t.Pieces[index].shuffledPendingChunkSpecs() {
-                       if !c.Request(request{index, cs}) {
+                       if !c.Request(request{pp.Integer(index), cs}) {
                                return
                        }
                }
diff --git a/ordered.go b/ordered.go
new file mode 100644 (file)
index 0000000..6a97da3
--- /dev/null
@@ -0,0 +1,40 @@
+package torrent
+
+import (
+       "container/list"
+)
+
+type OrderedList struct {
+       list     *list.List
+       lessFunc func(a, b interface{}) bool
+}
+
+func (me *OrderedList) Len() int {
+       return me.list.Len()
+}
+
+func NewList(lessFunc func(a, b interface{}) bool) *OrderedList {
+       return &OrderedList{
+               list:     list.New(),
+               lessFunc: lessFunc,
+       }
+}
+
+func (me *OrderedList) ValueChanged(e *list.Element) {
+       for prev := e.Prev(); prev != nil && me.lessFunc(e.Value, prev.Value); prev = e.Prev() {
+               me.list.MoveBefore(e, prev)
+       }
+       for next := e.Next(); next != nil && me.lessFunc(next.Value, e.Value); next = e.Next() {
+               me.list.MoveAfter(e, next)
+       }
+}
+
+func (me *OrderedList) Insert(value interface{}) (ret *list.Element) {
+       ret = me.list.PushFront(value)
+       me.ValueChanged(ret)
+       return
+}
+
+func (me *OrderedList) Front() *list.Element {
+       return me.list.Front()
+}
diff --git a/ordered_test.go b/ordered_test.go
new file mode 100644 (file)
index 0000000..7c4c4b0
--- /dev/null
@@ -0,0 +1,47 @@
+package torrent
+
+import (
+       "testing"
+)
+
+func TestOrderedList(t *testing.T) {
+       ol := NewList(func(a, b interface{}) bool {
+               return a.(int) < b.(int)
+       })
+       if ol.Len() != 0 {
+               t.FailNow()
+       }
+       e := ol.Insert(0)
+       if ol.Len() != 1 {
+               t.FailNow()
+       }
+       if e.Value.(int) != 0 {
+               t.FailNow()
+       }
+       e = ol.Front()
+       if e.Value.(int) != 0 {
+               t.FailNow()
+       }
+       if e.Next() != nil {
+               t.FailNow()
+       }
+       ol.Insert(1)
+       if e.Next().Value.(int) != 1 {
+               t.FailNow()
+       }
+       ol.Insert(-1)
+       if e.Prev().Value.(int) != -1 {
+               t.FailNow()
+       }
+       e.Value = -2
+       ol.ValueChanged(e)
+       if e.Prev() != nil {
+               t.FailNow()
+       }
+       if e.Next().Value.(int) != -1 {
+               t.FailNow()
+       }
+       if ol.Len() != 3 {
+               t.FailNow()
+       }
+}
index 96fd13295548f1b21e1364c7a1bfe7870e46acfd..e6844f1c03d468a1fa4b42c8149d982bb27241d6 100644 (file)
@@ -11,7 +11,6 @@ import (
        "io"
        "log"
        "net"
-       "sort"
 )
 
 func (t *torrent) PieceNumPendingBytes(index pp.Integer) (count pp.Integer) {
@@ -26,14 +25,24 @@ func (t *torrent) PieceNumPendingBytes(index pp.Integer) (count pp.Integer) {
        return
 }
 
+type pieceBytesLeft struct {
+       Piece, BytesLeft int
+}
+
+type torrentPiece struct {
+       piece
+       bytesLeftElement *list.Element
+}
+
 type torrent struct {
-       InfoHash   InfoHash
-       Pieces     []*piece
-       Data       mmap_span.MMapSpan
-       Info       *metainfo.Info
-       Conns      []*connection
-       Peers      []Peer
-       Priorities *list.List
+       InfoHash          InfoHash
+       Pieces            []*torrentPiece
+       PiecesByBytesLeft *OrderedList
+       Data              mmap_span.MMapSpan
+       Info              *metainfo.Info
+       Conns             []*connection
+       Peers             []Peer
+       Priorities        *list.List
        // BEP 12 Multitracker Metadata Extension. The tracker.Client instances
        // mirror their respective URLs from the announce-list key.
        Trackers      [][]tracker.Client
@@ -89,11 +98,23 @@ func (t *torrent) setMetadata(md metainfo.Info, dataDir string, infoBytes []byte
        if err != nil {
                return
        }
-       for _, hash := range infoPieceHashes(&md) {
-               piece := &piece{}
+       t.PiecesByBytesLeft = NewList(func(a, b interface{}) bool {
+               apb := t.PieceNumPendingBytes(pp.Integer(a.(int)))
+               bpb := t.PieceNumPendingBytes(pp.Integer(b.(int)))
+               if apb < bpb {
+                       return true
+               }
+               if apb > bpb {
+                       return false
+               }
+               return a.(int) < b.(int)
+       })
+       for index, hash := range infoPieceHashes(&md) {
+               piece := &torrentPiece{}
                copyHashSum(piece.Hash[:], []byte(hash))
                t.Pieces = append(t.Pieces, piece)
-               t.pendAllChunkSpecs(pp.Integer(len(t.Pieces) - 1))
+               piece.bytesLeftElement = t.PiecesByBytesLeft.Insert(index)
+               t.pendAllChunkSpecs(pp.Integer(index))
        }
        t.Priorities = list.New()
        return
@@ -170,6 +191,7 @@ func (t *torrent) NewMetadataExtensionMessage(c *connection, msgType int, piece
 }
 
 func (t *torrent) WriteStatus(w io.Writer) {
+       fmt.Fprintf(w, "Infohash: %x\n", t.InfoHash)
        fmt.Fprint(w, "Pieces: ")
        for index := range t.Pieces {
                fmt.Fprintf(w, "%c", t.pieceStatusChar(index))
@@ -251,19 +273,6 @@ func (t *torrent) Close() (err error) {
        return
 }
 
-func (t *torrent) piecesByPendingBytes() (indices []pp.Integer) {
-       slice := pieceByBytesPendingSlice{
-               Pending: make([]pp.Integer, 0, len(t.Pieces)),
-               Indices: make([]pp.Integer, 0, len(t.Pieces)),
-       }
-       for i := range t.Pieces {
-               slice.Pending = append(slice.Pending, t.PieceNumPendingBytes(pp.Integer(i)))
-               slice.Indices = append(slice.Indices, pp.Integer(i))
-       }
-       sort.Sort(slice)
-       return slice.Indices
-}
-
 // Return the request that would include the given offset into the torrent data.
 func torrentOffsetRequest(torrentLength, pieceSize, chunkSize, offset int64) (
        r request, ok bool) {
@@ -330,6 +339,7 @@ func (t *torrent) pendAllChunkSpecs(index pp.Integer) {
                cs[c] = struct{}{}
                c.Begin += c.Length
        }
+       t.PiecesByBytesLeft.ValueChanged(piece.bytesLeftElement)
        return
 }