// Record that we have the chunk.
delete(t.Pieces[req.Index].PendingChunkSpecs, req.chunkSpec)
t.PiecesByBytesLeft.ValueChanged(t.Pieces[req.Index].bytesLeftElement)
+ me.dataReady(dataSpec{t.InfoHash, req})
if len(t.Pieces[req.Index].PendingChunkSpecs) == 0 {
me.queuePieceCheck(t, req.Index)
}
me.downloadStrategy.TorrentGotChunk(t, req)
// Cancel pending requests for this chunk.
- cancelled := false
for _, c := range t.Conns {
if me.connCancel(t, c, req) {
- cancelled = true
me.replenishConnRequests(t, c)
}
}
- if cancelled {
- log.Printf("cancelled concurrent requests for %v", req)
- }
- me.dataReady(dataSpec{t.InfoHash, req})
+ me.downloadStrategy.AssertNotRequested(t, req)
+
return nil
}
TorrentGotChunk(t *torrent, r request)
TorrentGotPiece(t *torrent, piece int)
WriteStatus(w io.Writer)
+ AssertNotRequested(*torrent, request)
}
type DefaultDownloadStrategy struct {
heat map[*torrent]map[request]int
}
+func (me *DefaultDownloadStrategy) AssertNotRequested(t *torrent, r request) {
+ if me.heat[t][r] != 0 {
+ panic("outstanding requests break invariant")
+ }
+}
+
func (me *DefaultDownloadStrategy) WriteStatus(w io.Writer) {}
func (s *DefaultDownloadStrategy) FillRequests(t *torrent, c *connection) {
func (me *DefaultDownloadStrategy) TorrentGotPiece(t *torrent, piece int) {}
func (*DefaultDownloadStrategy) TorrentPrioritize(t *torrent, off, _len int64) {}
-func NewResponsiveDownloadStrategy(readahead int) *responsiveDownloadStrategy {
+func NewResponsiveDownloadStrategy(readahead int64) *responsiveDownloadStrategy {
return &responsiveDownloadStrategy{
- Readahead: readahead,
- lastReadPiece: make(map[*torrent]int),
- priorities: make(map[*torrent]map[request]struct{}),
+ Readahead: readahead,
+ lastReadOffset: make(map[*torrent]int64),
+ priorities: make(map[*torrent]map[request]struct{}),
+ requestHeat: make(map[*torrent]map[request]int),
}
}
type responsiveDownloadStrategy struct {
// How many bytes to preemptively download starting at the beginning of
// the last piece read for a given torrent.
- Readahead int
- lastReadPiece map[*torrent]int
- priorities map[*torrent]map[request]struct{}
+ Readahead int64
+ lastReadOffset map[*torrent]int64
+ priorities map[*torrent]map[request]struct{}
+ requestHeat map[*torrent]map[request]int
}
func (me *responsiveDownloadStrategy) WriteStatus(w io.Writer) {
func (me *responsiveDownloadStrategy) TorrentStarted(t *torrent) {
me.priorities[t] = make(map[request]struct{})
+ me.requestHeat[t] = make(map[request]int)
}
func (me *responsiveDownloadStrategy) TorrentStopped(t *torrent) {
- delete(me.lastReadPiece, t)
+ delete(me.lastReadOffset, t)
delete(me.priorities, t)
}
-func (responsiveDownloadStrategy) DeleteRequest(*torrent, request) {}
+func (me *responsiveDownloadStrategy) DeleteRequest(t *torrent, r request) {
+ rh := me.requestHeat[t]
+ if rh[r] <= 0 {
+ panic("request heat invariant broken")
+ }
+ rh[r]--
+}
func (me *responsiveDownloadStrategy) FillRequests(t *torrent, c *connection) {
+ requestWrapper := func(req request) bool {
+ if c.RequestPending(req) {
+ return true
+ }
+ again := c.Request(req)
+ if c.RequestPending(req) {
+ me.requestHeat[t][req]++
+ }
+ return again
+ }
+
for req := range me.priorities[t] {
if _, ok := t.Pieces[req.Index].PendingChunkSpecs[req.chunkSpec]; !ok {
panic(req)
}
- if !c.Request(req) {
+ if !requestWrapper(req) {
return
}
}
- if len(c.Requests) >= 32 {
+ if len(c.Requests) >= 16 {
return
}
- // Short circuit request fills at a level that might reduce receiving of
- // unnecessary chunks.
- requestWrapper := func(r request) bool {
- if len(c.Requests) >= 64 {
- return false
- }
- return c.Request(r)
- }
-
- requestPiece := func(piece int) bool {
- if piece >= t.NumPieces() {
- return true
- }
- for _, cs := range t.Pieces[piece].shuffledPendingChunkSpecs() {
- if !requestWrapper(request{pp.Integer(piece), cs}) {
+ requestWrapper = func() func(request) bool {
+ f := requestWrapper
+ return func(req request) bool {
+ if len(c.Requests) >= 32 {
return false
}
+ return f(req)
}
- return true
- }
+ }()
- if lastReadPiece, ok := me.lastReadPiece[t]; ok {
- readaheadPieces := (me.Readahead + t.UsualPieceSize() - 1) / t.UsualPieceSize()
- for i := 0; i < readaheadPieces; i++ {
- if !requestPiece(lastReadPiece + i) {
- return
+ if lastReadOffset, ok := me.lastReadOffset[t]; ok {
+ for off := lastReadOffset; off < lastReadOffset+chunkSize-1+me.Readahead; off += chunkSize {
+ req, ok := t.offsetRequest(off)
+ if !ok {
+ break
}
- }
- }
-
- // Then finish off incomplete pieces in order of bytes remaining.
- for e := t.PiecesByBytesLeft.Front(); e != nil; e = e.Next() {
- index := e.Value.(int)
- // Stop when we're onto untouched pieces.
- if !t.PiecePartiallyDownloaded(index) {
- break
- }
- // Request chunks in random order to reduce overlap with other
- // connections.
- for _, cs := range t.Pieces[index].shuffledPendingChunkSpecs() {
- if !requestWrapper(request{pp.Integer(index), cs}) {
+ if me.requestHeat[t][req] >= 2 {
+ continue
+ }
+ if !t.wantChunk(req) {
+ continue
+ }
+ if !requestWrapper(req) {
return
}
}
}
func (s *responsiveDownloadStrategy) TorrentPrioritize(t *torrent, off, _len int64) {
- s.lastReadPiece[t] = int(off / int64(t.UsualPieceSize()))
+ s.lastReadOffset[t] = off
for _len > 0 {
req, ok := t.offsetRequest(off)
if !ok {
}
}
}
+
+func (s *responsiveDownloadStrategy) AssertNotRequested(t *torrent, r request) {
+ if s.requestHeat[t][r] != 0 {
+ panic("outstanding requests invariant broken")
+ }
+}