From: Matt Joiner Date: Wed, 9 Jul 2014 14:26:58 +0000 (+1000) Subject: Keep pieces sorted by bytes left X-Git-Tag: v1.0.0~1679 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=4c2d07337d274740b99b15c6d367e937bbdf072c;p=btrtrc.git Keep pieces sorted by bytes left --- diff --git a/client.go b/client.go index 7ab5ca6a..1b6f89e8 100644 --- a/client.go +++ b/client.go @@ -1024,6 +1024,7 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er // Record that we have the chunk. delete(t.Pieces[req.Index].PendingChunkSpecs, req.chunkSpec) + t.PiecesByBytesLeft.ValueChanged(t.Pieces[req.Index].bytesLeftElement) if len(t.Pieces[req.Index].PendingChunkSpecs) == 0 { me.queuePieceCheck(t, req.Index) } diff --git a/download_strategies.go b/download_strategies.go index 8be1be40..c6d0e1e1 100644 --- a/download_strategies.go +++ b/download_strategies.go @@ -50,16 +50,17 @@ func (s *DefaultDownloadStrategy) FillRequests(t *torrent, c *connection) { return } } - ppbs := t.piecesByPendingBytes() // Then finish off incomplete pieces in order of bytes remaining. for _, heatThreshold := range []int{0, 4, 60} { - for _, pieceIndex := range ppbs { - for _, chunkSpec := range t.Pieces[pieceIndex].shuffledPendingChunkSpecs() { + for e := t.PiecesByBytesLeft.Front(); e != nil; e = e.Next() { + pieceIndex := pp.Integer(e.Value.(int)) + // for _, chunkSpec := range t.Pieces[pieceIndex].shuffledPendingChunkSpecs() { + for chunkSpec := range t.Pieces[pieceIndex].PendingChunkSpecs { r := request{pieceIndex, chunkSpec} if th[r] > heatThreshold { continue } - if !addRequest(request{pieceIndex, chunkSpec}) { + if !addRequest(r) { return } } @@ -121,15 +122,16 @@ func (me *ResponsiveDownloadStrategy) FillRequests(t *torrent, c *connection) { } } // Then finish off incomplete pieces in order of bytes remaining. - for _, index := range t.piecesByPendingBytes() { + for e := t.PiecesByBytesLeft.Front(); e != nil; e = e.Next() { + index := e.Value.(int) // Stop when we're onto untouched pieces. - if !t.PiecePartiallyDownloaded(int(index)) { + if !t.PiecePartiallyDownloaded(index) { break } // Request chunks in random order to reduce overlap with other // connections. for _, cs := range t.Pieces[index].shuffledPendingChunkSpecs() { - if !c.Request(request{index, cs}) { + if !c.Request(request{pp.Integer(index), cs}) { return } } diff --git a/ordered.go b/ordered.go new file mode 100644 index 00000000..6a97da34 --- /dev/null +++ b/ordered.go @@ -0,0 +1,40 @@ +package torrent + +import ( + "container/list" +) + +type OrderedList struct { + list *list.List + lessFunc func(a, b interface{}) bool +} + +func (me *OrderedList) Len() int { + return me.list.Len() +} + +func NewList(lessFunc func(a, b interface{}) bool) *OrderedList { + return &OrderedList{ + list: list.New(), + lessFunc: lessFunc, + } +} + +func (me *OrderedList) ValueChanged(e *list.Element) { + for prev := e.Prev(); prev != nil && me.lessFunc(e.Value, prev.Value); prev = e.Prev() { + me.list.MoveBefore(e, prev) + } + for next := e.Next(); next != nil && me.lessFunc(next.Value, e.Value); next = e.Next() { + me.list.MoveAfter(e, next) + } +} + +func (me *OrderedList) Insert(value interface{}) (ret *list.Element) { + ret = me.list.PushFront(value) + me.ValueChanged(ret) + return +} + +func (me *OrderedList) Front() *list.Element { + return me.list.Front() +} diff --git a/ordered_test.go b/ordered_test.go new file mode 100644 index 00000000..7c4c4b08 --- /dev/null +++ b/ordered_test.go @@ -0,0 +1,47 @@ +package torrent + +import ( + "testing" +) + +func TestOrderedList(t *testing.T) { + ol := NewList(func(a, b interface{}) bool { + return a.(int) < b.(int) + }) + if ol.Len() != 0 { + t.FailNow() + } + e := ol.Insert(0) + if ol.Len() != 1 { + t.FailNow() + } + if e.Value.(int) != 0 { + t.FailNow() + } + e = ol.Front() + if e.Value.(int) != 0 { + t.FailNow() + } + if e.Next() != nil { + t.FailNow() + } + ol.Insert(1) + if e.Next().Value.(int) != 1 { + t.FailNow() + } + ol.Insert(-1) + if e.Prev().Value.(int) != -1 { + t.FailNow() + } + e.Value = -2 + ol.ValueChanged(e) + if e.Prev() != nil { + t.FailNow() + } + if e.Next().Value.(int) != -1 { + t.FailNow() + } + if ol.Len() != 3 { + t.FailNow() + } +} diff --git a/torrent.go b/torrent.go index 96fd1329..e6844f1c 100644 --- a/torrent.go +++ b/torrent.go @@ -11,7 +11,6 @@ import ( "io" "log" "net" - "sort" ) func (t *torrent) PieceNumPendingBytes(index pp.Integer) (count pp.Integer) { @@ -26,14 +25,24 @@ func (t *torrent) PieceNumPendingBytes(index pp.Integer) (count pp.Integer) { return } +type pieceBytesLeft struct { + Piece, BytesLeft int +} + +type torrentPiece struct { + piece + bytesLeftElement *list.Element +} + type torrent struct { - InfoHash InfoHash - Pieces []*piece - Data mmap_span.MMapSpan - Info *metainfo.Info - Conns []*connection - Peers []Peer - Priorities *list.List + InfoHash InfoHash + Pieces []*torrentPiece + PiecesByBytesLeft *OrderedList + Data mmap_span.MMapSpan + Info *metainfo.Info + Conns []*connection + Peers []Peer + Priorities *list.List // BEP 12 Multitracker Metadata Extension. The tracker.Client instances // mirror their respective URLs from the announce-list key. Trackers [][]tracker.Client @@ -89,11 +98,23 @@ func (t *torrent) setMetadata(md metainfo.Info, dataDir string, infoBytes []byte if err != nil { return } - for _, hash := range infoPieceHashes(&md) { - piece := &piece{} + t.PiecesByBytesLeft = NewList(func(a, b interface{}) bool { + apb := t.PieceNumPendingBytes(pp.Integer(a.(int))) + bpb := t.PieceNumPendingBytes(pp.Integer(b.(int))) + if apb < bpb { + return true + } + if apb > bpb { + return false + } + return a.(int) < b.(int) + }) + for index, hash := range infoPieceHashes(&md) { + piece := &torrentPiece{} copyHashSum(piece.Hash[:], []byte(hash)) t.Pieces = append(t.Pieces, piece) - t.pendAllChunkSpecs(pp.Integer(len(t.Pieces) - 1)) + piece.bytesLeftElement = t.PiecesByBytesLeft.Insert(index) + t.pendAllChunkSpecs(pp.Integer(index)) } t.Priorities = list.New() return @@ -170,6 +191,7 @@ func (t *torrent) NewMetadataExtensionMessage(c *connection, msgType int, piece } func (t *torrent) WriteStatus(w io.Writer) { + fmt.Fprintf(w, "Infohash: %x\n", t.InfoHash) fmt.Fprint(w, "Pieces: ") for index := range t.Pieces { fmt.Fprintf(w, "%c", t.pieceStatusChar(index)) @@ -251,19 +273,6 @@ func (t *torrent) Close() (err error) { return } -func (t *torrent) piecesByPendingBytes() (indices []pp.Integer) { - slice := pieceByBytesPendingSlice{ - Pending: make([]pp.Integer, 0, len(t.Pieces)), - Indices: make([]pp.Integer, 0, len(t.Pieces)), - } - for i := range t.Pieces { - slice.Pending = append(slice.Pending, t.PieceNumPendingBytes(pp.Integer(i))) - slice.Indices = append(slice.Indices, pp.Integer(i)) - } - sort.Sort(slice) - return slice.Indices -} - // Return the request that would include the given offset into the torrent data. func torrentOffsetRequest(torrentLength, pieceSize, chunkSize, offset int64) ( r request, ok bool) { @@ -330,6 +339,7 @@ func (t *torrent) pendAllChunkSpecs(index pp.Integer) { cs[c] = struct{}{} c.Begin += c.Length } + t.PiecesByBytesLeft.ValueChanged(piece.bytesLeftElement) return }