From: Matt Joiner <anacrolix@gmail.com>
Date: Sun, 24 Aug 2014 19:31:34 +0000 (+1000)
Subject: Improve incomplete piece ordering, responsive download strategy
X-Git-Tag: v1.0.0~1602
X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=c8f335182f81d645cb3040b1f61fee779aa757a3;p=btrtrc.git

Improve incomplete piece ordering, responsive download strategy
---

diff --git a/client.go b/client.go
index a513a0ae..dc862969 100644
--- a/client.go
+++ b/client.go
@@ -1241,11 +1241,11 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
 
 	// Record that we have the chunk.
 	delete(t.Pieces[req.Index].PendingChunkSpecs, req.chunkSpec)
-	t.PiecesByBytesLeft.ValueChanged(t.Pieces[req.Index].bytesLeftElement)
 	me.dataReady(dataSpec{t.InfoHash, req})
 	if len(t.Pieces[req.Index].PendingChunkSpecs) == 0 {
 		me.queuePieceCheck(t, req.Index)
 	}
+	t.PieceBytesLeftChanged(int(req.Index))
 
 	// Unprioritize the chunk.
 	me.downloadStrategy.TorrentGotChunk(t, req)
@@ -1301,6 +1301,7 @@ func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) {
 			t.pendAllChunkSpecs(piece)
 		}
 	}
+	t.PieceBytesLeftChanged(int(piece))
 	for _, conn := range t.Conns {
 		if correct {
 			conn.Post(pp.Message{
diff --git a/download_strategies.go b/download_strategies.go
index db31bc3a..1de19a5b 100644
--- a/download_strategies.go
+++ b/download_strategies.go
@@ -62,7 +62,7 @@ func (s *DefaultDownloadStrategy) FillRequests(t *torrent, c *connection) {
 	}
 	// Then finish off incomplete pieces in order of bytes remaining.
 	for _, heatThreshold := range []int{1, 4, 15, 60} {
-		for e := t.PiecesByBytesLeft.Front(); e != nil; e = e.Next() {
+		for e := t.IncompletePiecesByBytesLeft.Front(); e != nil; e = e.Next() {
 			pieceIndex := pp.Integer(e.Value.(int))
 			for _, chunkSpec := range t.Pieces[pieceIndex].shuffledPendingChunkSpecs() {
 				// for chunkSpec := range t.Pieces[pieceIndex].PendingChunkSpecs {
@@ -154,13 +154,14 @@ func (me *responsiveDownloadStrategy) DeleteRequest(t *torrent, r request) {
 }
 
 func (me *responsiveDownloadStrategy) FillRequests(t *torrent, c *connection) {
+	th := me.requestHeat[t]
 	requestWrapper := func(req request) bool {
 		if c.RequestPending(req) {
 			return true
 		}
 		again := c.Request(req)
 		if c.RequestPending(req) {
-			me.requestHeat[t][req]++
+			th[req]++
 		}
 		return again
 	}
@@ -189,22 +190,56 @@ func (me *responsiveDownloadStrategy) FillRequests(t *torrent, c *connection) {
 	}()
 
 	if lastReadOffset, ok := me.lastReadOffset[t]; ok {
-		for off := lastReadOffset; off < lastReadOffset+chunkSize-1+me.Readahead; off += chunkSize {
+		var nextAhead int64
+		for ahead := int64(0); ahead < me.Readahead; ahead = nextAhead {
+			off := lastReadOffset + ahead
 			req, ok := t.offsetRequest(off)
 			if !ok {
 				break
 			}
-			if me.requestHeat[t][req] >= 2 {
+			if !t.wantPiece(int(req.Index)) {
+				nextAhead = ahead + int64(t.PieceLength(req.Index))
 				continue
 			}
+			nextAhead = ahead + int64(req.Length)
 			if !t.wantChunk(req) {
 				continue
 			}
+			if th[req] >= func() int {
+				// Determine allowed redundancy based on how far into the
+				// readahead zone we're looking.
+				if ahead >= (2*me.Readahead+2)/3 {
+					return 1
+				} else if ahead >= (me.Readahead+2)/3 {
+					return 2
+				} else {
+					return 3
+				}
+			}() {
+				continue
+			}
 			if !requestWrapper(req) {
 				return
 			}
 		}
 	}
+
+	// t.assertIncompletePiecesByBytesLeftOrdering()
+	for e := t.IncompletePiecesByBytesLeft.Front(); e != nil; e = e.Next() {
+		p := e.Value.(int)
+		if !t.PiecePartiallyDownloaded(p) && int(t.PieceLength(pp.Integer(p))) == t.UsualPieceSize() {
+			break
+		}
+		for chunkSpec := range t.Pieces[p].PendingChunkSpecs {
+			r := request{pp.Integer(p), chunkSpec}
+			if th[r] >= 2 {
+				continue
+			}
+			if !requestWrapper(r) {
+				return
+			}
+		}
+	}
 }
 
 func (me *responsiveDownloadStrategy) TorrentGotChunk(t *torrent, req request) {
diff --git a/ordered.go b/ordered.go
index 6a97da34..9c546054 100644
--- a/ordered.go
+++ b/ordered.go
@@ -38,3 +38,7 @@ func (me *OrderedList) Insert(value interface{}) (ret *list.Element) {
 func (me *OrderedList) Front() *list.Element {
 	return me.list.Front()
 }
+
+func (me *OrderedList) Remove(e *list.Element) interface{} {
+	return me.list.Remove(e)
+}
diff --git a/torrent.go b/torrent.go
index 792ce20f..07a18c48 100644
--- a/torrent.go
+++ b/torrent.go
@@ -44,17 +44,18 @@ type peersKey struct {
 }
 
 type torrent struct {
-	closed            bool
-	InfoHash          InfoHash
-	Pieces            []*torrentPiece
-	PiecesByBytesLeft *OrderedList
-	Data              mmap_span.MMapSpan
-	length            int64
+	closing                     chan struct{}
+	InfoHash                    InfoHash
+	Pieces                      []*torrentPiece
+	IncompletePiecesByBytesLeft *OrderedList
+	length                      int64
 	// Prevent mutations to Data memory maps while in use as they're not safe.
 	dataLock sync.RWMutex
-	Info     *metainfo.Info
-	Conns    []*connection
-	Peers    map[peersKey]Peer
+	Data     mmap_span.MMapSpan
+
+	Info  *metainfo.Info
+	Conns []*connection
+	Peers map[peersKey]Peer
 	// BEP 12 Multitracker Metadata Extension. The tracker.Client instances
 	// mirror their respective URLs from the announce-list key.
 	Trackers     [][]tracker.Client
@@ -63,6 +64,34 @@ type torrent struct {
 	metadataHave []bool
 }
 
+func (t *torrent) assertIncompletePiecesByBytesLeftOrdering() {
+	allIndexes := make(map[int]struct{}, t.NumPieces())
+	for i := 0; i < t.NumPieces(); i++ {
+		allIndexes[i] = struct{}{}
+	}
+	var lastBytesLeft int
+	for e := t.IncompletePiecesByBytesLeft.Front(); e != nil; e = e.Next() {
+		i := e.Value.(int)
+		if _, ok := allIndexes[i]; !ok {
+			panic("duplicate entry")
+		}
+		delete(allIndexes, i)
+		if t.Pieces[i].Complete() {
+			panic("complete piece")
+		}
+		bytesLeft := int(t.PieceNumPendingBytes(pp.Integer(i)))
+		if bytesLeft < lastBytesLeft {
+			panic("ordering broken")
+		}
+		lastBytesLeft = bytesLeft
+	}
+	for i := range allIndexes {
+		if !t.Pieces[i].Complete() {
+			panic("leaked incomplete piece")
+		}
+	}
+}
+
 func (t *torrent) AddPeers(pp []Peer) {
 	for _, p := range pp {
 		t.Peers[peersKey{string(p.IP), p.Port}] = p
@@ -124,7 +153,7 @@ func (t *torrent) setMetadata(md metainfo.Info, dataDir string, infoBytes []byte
 		return
 	}
 	t.length = t.Data.Size()
-	t.PiecesByBytesLeft = NewList(func(a, b interface{}) bool {
+	t.IncompletePiecesByBytesLeft = NewList(func(a, b interface{}) bool {
 		apb := t.PieceNumPendingBytes(pp.Integer(a.(int)))
 		bpb := t.PieceNumPendingBytes(pp.Integer(b.(int)))
 		if apb < bpb {
@@ -139,9 +168,10 @@ func (t *torrent) setMetadata(md metainfo.Info, dataDir string, infoBytes []byte
 		piece := &torrentPiece{}
 		util.CopyExact(piece.Hash[:], hash)
 		t.Pieces = append(t.Pieces, piece)
-		piece.bytesLeftElement = t.PiecesByBytesLeft.Insert(index)
+		piece.bytesLeftElement = t.IncompletePiecesByBytesLeft.Insert(index)
 		t.pendAllChunkSpecs(pp.Integer(index))
 	}
+	t.assertIncompletePiecesByBytesLeftOrdering()
 	for _, conn := range t.Conns {
 		if err := conn.setNumPieces(t.NumPieces()); err != nil {
 			log.Printf("closing connection: %s", err)
@@ -391,10 +421,19 @@ func (t *torrent) pendAllChunkSpecs(index pp.Integer) {
 	for _, cs := range t.pieceChunks(int(index)) {
 		pcss[cs] = struct{}{}
 	}
-	t.PiecesByBytesLeft.ValueChanged(piece.bytesLeftElement)
+	t.IncompletePiecesByBytesLeft.ValueChanged(piece.bytesLeftElement)
 	return
 }
 
+func (t *torrent) PieceBytesLeftChanged(index int) {
+	p := t.Pieces[index]
+	if p.Complete() {
+		t.IncompletePiecesByBytesLeft.Remove(p.bytesLeftElement)
+	} else {
+		t.IncompletePiecesByBytesLeft.ValueChanged(p.bytesLeftElement)
+	}
+}
+
 type Peer struct {
 	Id     [20]byte
 	IP     net.IP