From 6aad8041abfa3036a030388803912d58ee336899 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Sat, 23 Sep 2017 15:25:47 +1000 Subject: [PATCH] Implement new request strategy The new strategy, 2, has the fastest connection download by priority in order, and all other pieces stick to a randomized ordering that's stable per connection. --- client.go | 1 + connection.go | 120 +++++++++++++++++++++++++++++++++++++------------- misc_test.go | 11 +++++ torrent.go | 5 ++- 4 files changed, 106 insertions(+), 31 deletions(-) diff --git a/client.go b/client.go index 781ebb14..74f8a0d5 100644 --- a/client.go +++ b/client.go @@ -1223,6 +1223,7 @@ func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) ( maxEstablishedConns: defaultEstablishedConnsPerTorrent, networkingEnabled: true, + requestStrategy: 2, } t.setChunkSize(defaultChunkSize) return diff --git a/connection.go b/connection.go index 774e55f1..ba1d1e1e 100644 --- a/connection.go +++ b/connection.go @@ -211,18 +211,16 @@ func (cn *connection) WriteStatus(w io.Writer, t *Torrent) { len(cn.PeerRequests), cn.statusFlags(), ) - fmt.Fprintf(w, " next pieces: %v\n", priorityBitmapHeadAsSlice(&cn.pieceRequestOrder, 10)) -} - -func priorityBitmapHeadAsSlice(pb *prioritybitmap.PriorityBitmap, n int) (ret []int) { - pb.IterTyped(func(i int) bool { - if len(ret) >= n { - return false - } - ret = append(ret, i) - return true - }) - return + roi := cn.pieceRequestOrderIter() + fmt.Fprintf(w, " next pieces: %v%s\n", + iter.ToSlice(iter.Head(10, roi)), + func() string { + if cn.shouldRequestWithoutBias() { + return " (fastest)" + } else { + return "" + } + }()) } func (cn *connection) Close() { @@ -472,22 +470,27 @@ func nextRequestState( networkingEnabled bool, currentRequests map[request]struct{}, peerChoking bool, - nextPieces *prioritybitmap.PriorityBitmap, + requestPieces iter.Func, pendingChunks func(piece int, f func(chunkSpec) bool) bool, requestsLowWater int, requestsHighWater int, ) ( - cancelExisting bool, - newRequests []request, - interested bool, + cancelExisting bool, // Cancel all our pending requests + newRequests []request, // Chunks to request that we currently aren't + interested bool, // Whether we should indicate interest, even if we don't request anything ) { - if !networkingEnabled || nextPieces.IsEmpty() { + if !networkingEnabled { return true, nil, false } - if peerChoking || len(currentRequests) > requestsLowWater { - return false, nil, !nextPieces.IsEmpty() + if len(currentRequests) > requestsLowWater { + return false, nil, true } - nextPieces.IterTyped(func(piece int) bool { + requestPieces(func(_piece interface{}) bool { + interested = true + if peerChoking { + return false + } + piece := _piece.(int) return pendingChunks(piece, func(cs chunkSpec) bool { r := request{pp.Integer(piece), cs} if _, ok := currentRequests[r]; !ok { @@ -499,19 +502,68 @@ func nextRequestState( return len(currentRequests)+len(newRequests) < requestsHighWater }) }) - return false, newRequests, true + return } func (cn *connection) updateRequests() { cn.tickleWriter() } +func iterBitmapsDistinct(skip bitmap.Bitmap, bms ...bitmap.Bitmap) iter.Func { + return func(cb iter.Callback) { + for _, bm := range bms { + if !iter.All(func(i interface{}) bool { + skip.Add(i.(int)) + return cb(i) + }, bitmap.Sub(bm, skip).Iter) { + return + } + } + } +} + +func (cn *connection) unbiasedPieceRequestOrder() iter.Func { + now, readahead := cn.t.readerPiecePriorities() + return iterBitmapsDistinct(cn.t.completedPieces.Copy(), now, readahead, cn.t.pendingPieces) +} + +// The connection should download highest priority pieces first, without any +// inclination toward avoiding wastage. Generally we might do this if there's +// a single connection, or this is the fastest connection, and we have active +// readers that signal an ordering preference. It's conceivable that the best +// connection should do this, since it's least likely to waste our time if +// assigned to the highest priority pieces, and assigning more than one this +// role would cause significant wasted bandwidth. +func (cn *connection) shouldRequestWithoutBias() bool { + if cn.t.requestStrategy != 2 { + return false + } + if len(cn.t.readers) == 0 { + return false + } + if len(cn.t.conns) == 1 { + return true + } + if cn == cn.t.fastestConn { + return true + } + return false +} + +func (cn *connection) pieceRequestOrderIter() iter.Func { + if cn.shouldRequestWithoutBias() { + return cn.unbiasedPieceRequestOrder() + } else { + return cn.pieceRequestOrder.Iter + } +} + func (cn *connection) desiredRequestState() (bool, []request, bool) { return nextRequestState( cn.t.networkingEnabled, cn.requests, cn.PeerChoked, - &cn.pieceRequestOrder, + cn.pieceRequestOrderIter(), func(piece int, f func(chunkSpec) bool) bool { return undirtiedChunks(piece, cn.t, f) }, @@ -545,16 +597,20 @@ func (cn *connection) updatePiecePriority(piece int) bool { return cn.stopRequestingPiece(piece) } prio := cn.getPieceInclination()[piece] - switch tpp { - case PiecePriorityNormal: - case PiecePriorityReadahead: - prio -= cn.t.numPieces() - case PiecePriorityNext, PiecePriorityNow: - prio -= 2 * cn.t.numPieces() + switch cn.t.requestStrategy { + case 1: + switch tpp { + case PiecePriorityNormal: + case PiecePriorityReadahead: + prio -= cn.t.numPieces() + case PiecePriorityNext, PiecePriorityNow: + prio -= 2 * cn.t.numPieces() + default: + panic(tpp) + } + prio += piece / 3 default: - panic(tpp) } - prio += piece / 3 return cn.pieceRequestOrder.Set(piece, prio) } @@ -966,6 +1022,10 @@ func (c *connection) receiveChunk(msg *pp.Message) { c.UsefulChunksReceived++ c.lastUsefulChunkReceived = time.Now() + if t.fastestConn != c { + // log.Printf("setting fastest connection %p", c) + } + t.fastestConn = c // Need to record that it hasn't been written yet, before we attempt to do // anything with it. diff --git a/misc_test.go b/misc_test.go index 17ee3133..8bb23c35 100644 --- a/misc_test.go +++ b/misc_test.go @@ -3,6 +3,8 @@ package torrent import ( "testing" + "github.com/anacrolix/missinggo/bitmap" + "github.com/anacrolix/missinggo/iter" "github.com/stretchr/testify/assert" ) @@ -17,3 +19,12 @@ func TestTorrentOffsetRequest(t *testing.T) { check(13, 5, 11, newRequest(2, 0, 3), true) check(13, 5, 13, request{}, false) } + +func TestIterBitmapsDistinct(t *testing.T) { + var skip, first, second bitmap.Bitmap + skip.Add(1) + first.Add(1, 0, 3) + second.Add(1, 2, 0) + assert.Equal(t, []interface{}{0, 3, 2}, iter.ToSlice(iterBitmapsDistinct(skip.Copy(), first, second))) + assert.Equal(t, []int{1}, skip.ToSortedSlice()) +} diff --git a/torrent.go b/torrent.go index cb838ef4..c09a4d0b 100644 --- a/torrent.go +++ b/torrent.go @@ -45,6 +45,7 @@ type Torrent struct { cl *Client networkingEnabled bool + requestStrategy int closed missinggo.Event infoHash metainfo.Hash @@ -68,12 +69,14 @@ type Torrent struct { // The info dict. nil if we don't have it (yet). info *metainfo.Info + // Active peer connections, running message stream loops. conns map[*connection]struct{} maxEstablishedConns int // Set of addrs to which we're attempting to connect. Connections are // half-open until all handshakes are completed. - halfOpen map[string]Peer + halfOpen map[string]Peer + fastestConn *connection // Reserve of peers to connect to. A peer can be both here and in the // active connections if were told about the peer after connecting with -- 2.48.1