// Record that we have the chunk.
delete(t.Pieces[req.Index].PendingChunkSpecs, req.chunkSpec)
+ t.PiecesByBytesLeft.ValueChanged(t.Pieces[req.Index].bytesLeftElement)
if len(t.Pieces[req.Index].PendingChunkSpecs) == 0 {
me.queuePieceCheck(t, req.Index)
}
return
}
}
- ppbs := t.piecesByPendingBytes()
// Then finish off incomplete pieces in order of bytes remaining.
for _, heatThreshold := range []int{0, 4, 60} {
- for _, pieceIndex := range ppbs {
- for _, chunkSpec := range t.Pieces[pieceIndex].shuffledPendingChunkSpecs() {
+ for e := t.PiecesByBytesLeft.Front(); e != nil; e = e.Next() {
+ pieceIndex := pp.Integer(e.Value.(int))
+ // for _, chunkSpec := range t.Pieces[pieceIndex].shuffledPendingChunkSpecs() {
+ for chunkSpec := range t.Pieces[pieceIndex].PendingChunkSpecs {
r := request{pieceIndex, chunkSpec}
if th[r] > heatThreshold {
continue
}
- if !addRequest(request{pieceIndex, chunkSpec}) {
+ if !addRequest(r) {
return
}
}
}
}
// Then finish off incomplete pieces in order of bytes remaining.
- for _, index := range t.piecesByPendingBytes() {
+ for e := t.PiecesByBytesLeft.Front(); e != nil; e = e.Next() {
+ index := e.Value.(int)
// Stop when we're onto untouched pieces.
- if !t.PiecePartiallyDownloaded(int(index)) {
+ if !t.PiecePartiallyDownloaded(index) {
break
}
// Request chunks in random order to reduce overlap with other
// connections.
for _, cs := range t.Pieces[index].shuffledPendingChunkSpecs() {
- if !c.Request(request{index, cs}) {
+ if !c.Request(request{pp.Integer(index), cs}) {
return
}
}
--- /dev/null
+package torrent
+
+import (
+ "container/list"
+)
+
+type OrderedList struct {
+ list *list.List
+ lessFunc func(a, b interface{}) bool
+}
+
+func (me *OrderedList) Len() int {
+ return me.list.Len()
+}
+
+func NewList(lessFunc func(a, b interface{}) bool) *OrderedList {
+ return &OrderedList{
+ list: list.New(),
+ lessFunc: lessFunc,
+ }
+}
+
+func (me *OrderedList) ValueChanged(e *list.Element) {
+ for prev := e.Prev(); prev != nil && me.lessFunc(e.Value, prev.Value); prev = e.Prev() {
+ me.list.MoveBefore(e, prev)
+ }
+ for next := e.Next(); next != nil && me.lessFunc(next.Value, e.Value); next = e.Next() {
+ me.list.MoveAfter(e, next)
+ }
+}
+
+func (me *OrderedList) Insert(value interface{}) (ret *list.Element) {
+ ret = me.list.PushFront(value)
+ me.ValueChanged(ret)
+ return
+}
+
+func (me *OrderedList) Front() *list.Element {
+ return me.list.Front()
+}
--- /dev/null
+package torrent
+
+import (
+ "testing"
+)
+
+func TestOrderedList(t *testing.T) {
+ ol := NewList(func(a, b interface{}) bool {
+ return a.(int) < b.(int)
+ })
+ if ol.Len() != 0 {
+ t.FailNow()
+ }
+ e := ol.Insert(0)
+ if ol.Len() != 1 {
+ t.FailNow()
+ }
+ if e.Value.(int) != 0 {
+ t.FailNow()
+ }
+ e = ol.Front()
+ if e.Value.(int) != 0 {
+ t.FailNow()
+ }
+ if e.Next() != nil {
+ t.FailNow()
+ }
+ ol.Insert(1)
+ if e.Next().Value.(int) != 1 {
+ t.FailNow()
+ }
+ ol.Insert(-1)
+ if e.Prev().Value.(int) != -1 {
+ t.FailNow()
+ }
+ e.Value = -2
+ ol.ValueChanged(e)
+ if e.Prev() != nil {
+ t.FailNow()
+ }
+ if e.Next().Value.(int) != -1 {
+ t.FailNow()
+ }
+ if ol.Len() != 3 {
+ t.FailNow()
+ }
+}
"io"
"log"
"net"
- "sort"
)
func (t *torrent) PieceNumPendingBytes(index pp.Integer) (count pp.Integer) {
return
}
+type pieceBytesLeft struct {
+ Piece, BytesLeft int
+}
+
+type torrentPiece struct {
+ piece
+ bytesLeftElement *list.Element
+}
+
type torrent struct {
- InfoHash InfoHash
- Pieces []*piece
- Data mmap_span.MMapSpan
- Info *metainfo.Info
- Conns []*connection
- Peers []Peer
- Priorities *list.List
+ InfoHash InfoHash
+ Pieces []*torrentPiece
+ PiecesByBytesLeft *OrderedList
+ Data mmap_span.MMapSpan
+ Info *metainfo.Info
+ Conns []*connection
+ Peers []Peer
+ Priorities *list.List
// BEP 12 Multitracker Metadata Extension. The tracker.Client instances
// mirror their respective URLs from the announce-list key.
Trackers [][]tracker.Client
if err != nil {
return
}
- for _, hash := range infoPieceHashes(&md) {
- piece := &piece{}
+ t.PiecesByBytesLeft = NewList(func(a, b interface{}) bool {
+ apb := t.PieceNumPendingBytes(pp.Integer(a.(int)))
+ bpb := t.PieceNumPendingBytes(pp.Integer(b.(int)))
+ if apb < bpb {
+ return true
+ }
+ if apb > bpb {
+ return false
+ }
+ return a.(int) < b.(int)
+ })
+ for index, hash := range infoPieceHashes(&md) {
+ piece := &torrentPiece{}
copyHashSum(piece.Hash[:], []byte(hash))
t.Pieces = append(t.Pieces, piece)
- t.pendAllChunkSpecs(pp.Integer(len(t.Pieces) - 1))
+ piece.bytesLeftElement = t.PiecesByBytesLeft.Insert(index)
+ t.pendAllChunkSpecs(pp.Integer(index))
}
t.Priorities = list.New()
return
}
func (t *torrent) WriteStatus(w io.Writer) {
+ fmt.Fprintf(w, "Infohash: %x\n", t.InfoHash)
fmt.Fprint(w, "Pieces: ")
for index := range t.Pieces {
fmt.Fprintf(w, "%c", t.pieceStatusChar(index))
return
}
-func (t *torrent) piecesByPendingBytes() (indices []pp.Integer) {
- slice := pieceByBytesPendingSlice{
- Pending: make([]pp.Integer, 0, len(t.Pieces)),
- Indices: make([]pp.Integer, 0, len(t.Pieces)),
- }
- for i := range t.Pieces {
- slice.Pending = append(slice.Pending, t.PieceNumPendingBytes(pp.Integer(i)))
- slice.Indices = append(slice.Indices, pp.Integer(i))
- }
- sort.Sort(slice)
- return slice.Indices
-}
-
// Return the request that would include the given offset into the torrent data.
func torrentOffsetRequest(torrentLength, pieceSize, chunkSize, offset int64) (
r request, ok bool) {
cs[c] = struct{}{}
c.Begin += c.Length
}
+ t.PiecesByBytesLeft.ValueChanged(piece.bytesLeftElement)
return
}