// Record that we have the chunk.
delete(t.Pieces[req.Index].PendingChunkSpecs, req.chunkSpec)
- t.PiecesByBytesLeft.ValueChanged(t.Pieces[req.Index].bytesLeftElement)
me.dataReady(dataSpec{t.InfoHash, req})
if len(t.Pieces[req.Index].PendingChunkSpecs) == 0 {
me.queuePieceCheck(t, req.Index)
}
+ t.PieceBytesLeftChanged(int(req.Index))
// Unprioritize the chunk.
me.downloadStrategy.TorrentGotChunk(t, req)
t.pendAllChunkSpecs(piece)
}
}
+ t.PieceBytesLeftChanged(int(piece))
for _, conn := range t.Conns {
if correct {
conn.Post(pp.Message{
}
// Then finish off incomplete pieces in order of bytes remaining.
for _, heatThreshold := range []int{1, 4, 15, 60} {
- for e := t.PiecesByBytesLeft.Front(); e != nil; e = e.Next() {
+ for e := t.IncompletePiecesByBytesLeft.Front(); e != nil; e = e.Next() {
pieceIndex := pp.Integer(e.Value.(int))
for _, chunkSpec := range t.Pieces[pieceIndex].shuffledPendingChunkSpecs() {
// for chunkSpec := range t.Pieces[pieceIndex].PendingChunkSpecs {
}
func (me *responsiveDownloadStrategy) FillRequests(t *torrent, c *connection) {
+ th := me.requestHeat[t]
requestWrapper := func(req request) bool {
if c.RequestPending(req) {
return true
}
again := c.Request(req)
if c.RequestPending(req) {
- me.requestHeat[t][req]++
+ th[req]++
}
return again
}
}()
if lastReadOffset, ok := me.lastReadOffset[t]; ok {
- for off := lastReadOffset; off < lastReadOffset+chunkSize-1+me.Readahead; off += chunkSize {
+ var nextAhead int64
+ for ahead := int64(0); ahead < me.Readahead; ahead = nextAhead {
+ off := lastReadOffset + ahead
req, ok := t.offsetRequest(off)
if !ok {
break
}
- if me.requestHeat[t][req] >= 2 {
+ if !t.wantPiece(int(req.Index)) {
+ nextAhead = ahead + int64(t.PieceLength(req.Index))
continue
}
+ nextAhead = ahead + int64(req.Length)
if !t.wantChunk(req) {
continue
}
+ if th[req] >= func() int {
+ // Determine allowed redundancy based on how far into the
+ // readahead zone we're looking.
+ if ahead >= (2*me.Readahead+2)/3 {
+ return 1
+ } else if ahead >= (me.Readahead+2)/3 {
+ return 2
+ } else {
+ return 3
+ }
+ }() {
+ continue
+ }
if !requestWrapper(req) {
return
}
}
}
+
+ // t.assertIncompletePiecesByBytesLeftOrdering()
+ for e := t.IncompletePiecesByBytesLeft.Front(); e != nil; e = e.Next() {
+ p := e.Value.(int)
+ if !t.PiecePartiallyDownloaded(p) && int(t.PieceLength(pp.Integer(p))) == t.UsualPieceSize() {
+ break
+ }
+ for chunkSpec := range t.Pieces[p].PendingChunkSpecs {
+ r := request{pp.Integer(p), chunkSpec}
+ if th[r] >= 2 {
+ continue
+ }
+ if !requestWrapper(r) {
+ return
+ }
+ }
+ }
}
func (me *responsiveDownloadStrategy) TorrentGotChunk(t *torrent, req request) {
func (me *OrderedList) Front() *list.Element {
return me.list.Front()
}
+
+func (me *OrderedList) Remove(e *list.Element) interface{} {
+ return me.list.Remove(e)
+}
}
type torrent struct {
- closed bool
- InfoHash InfoHash
- Pieces []*torrentPiece
- PiecesByBytesLeft *OrderedList
- Data mmap_span.MMapSpan
- length int64
+ closing chan struct{}
+ InfoHash InfoHash
+ Pieces []*torrentPiece
+ IncompletePiecesByBytesLeft *OrderedList
+ length int64
// Prevent mutations to Data memory maps while in use as they're not safe.
dataLock sync.RWMutex
- Info *metainfo.Info
- Conns []*connection
- Peers map[peersKey]Peer
+ Data mmap_span.MMapSpan
+
+ Info *metainfo.Info
+ Conns []*connection
+ Peers map[peersKey]Peer
// BEP 12 Multitracker Metadata Extension. The tracker.Client instances
// mirror their respective URLs from the announce-list key.
Trackers [][]tracker.Client
metadataHave []bool
}
+func (t *torrent) assertIncompletePiecesByBytesLeftOrdering() {
+ allIndexes := make(map[int]struct{}, t.NumPieces())
+ for i := 0; i < t.NumPieces(); i++ {
+ allIndexes[i] = struct{}{}
+ }
+ var lastBytesLeft int
+ for e := t.IncompletePiecesByBytesLeft.Front(); e != nil; e = e.Next() {
+ i := e.Value.(int)
+ if _, ok := allIndexes[i]; !ok {
+ panic("duplicate entry")
+ }
+ delete(allIndexes, i)
+ if t.Pieces[i].Complete() {
+ panic("complete piece")
+ }
+ bytesLeft := int(t.PieceNumPendingBytes(pp.Integer(i)))
+ if bytesLeft < lastBytesLeft {
+ panic("ordering broken")
+ }
+ lastBytesLeft = bytesLeft
+ }
+ for i := range allIndexes {
+ if !t.Pieces[i].Complete() {
+ panic("leaked incomplete piece")
+ }
+ }
+}
+
func (t *torrent) AddPeers(pp []Peer) {
for _, p := range pp {
t.Peers[peersKey{string(p.IP), p.Port}] = p
return
}
t.length = t.Data.Size()
- t.PiecesByBytesLeft = NewList(func(a, b interface{}) bool {
+ t.IncompletePiecesByBytesLeft = NewList(func(a, b interface{}) bool {
apb := t.PieceNumPendingBytes(pp.Integer(a.(int)))
bpb := t.PieceNumPendingBytes(pp.Integer(b.(int)))
if apb < bpb {
piece := &torrentPiece{}
util.CopyExact(piece.Hash[:], hash)
t.Pieces = append(t.Pieces, piece)
- piece.bytesLeftElement = t.PiecesByBytesLeft.Insert(index)
+ piece.bytesLeftElement = t.IncompletePiecesByBytesLeft.Insert(index)
t.pendAllChunkSpecs(pp.Integer(index))
}
+ t.assertIncompletePiecesByBytesLeftOrdering()
for _, conn := range t.Conns {
if err := conn.setNumPieces(t.NumPieces()); err != nil {
log.Printf("closing connection: %s", err)
for _, cs := range t.pieceChunks(int(index)) {
pcss[cs] = struct{}{}
}
- t.PiecesByBytesLeft.ValueChanged(piece.bytesLeftElement)
+ t.IncompletePiecesByBytesLeft.ValueChanged(piece.bytesLeftElement)
return
}
+func (t *torrent) PieceBytesLeftChanged(index int) {
+ p := t.Pieces[index]
+ if p.Complete() {
+ t.IncompletePiecesByBytesLeft.Remove(p.bytesLeftElement)
+ } else {
+ t.IncompletePiecesByBytesLeft.ValueChanged(p.bytesLeftElement)
+ }
+}
+
type Peer struct {
Id [20]byte
IP net.IP