]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Implement new request strategy
authorMatt Joiner <anacrolix@gmail.com>
Sat, 23 Sep 2017 05:25:47 +0000 (15:25 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Sat, 23 Sep 2017 05:25:47 +0000 (15:25 +1000)
The new strategy, 2, has the fastest connection download by priority in order, and all other pieces stick to a randomized ordering that's stable per connection.

client.go
connection.go
misc_test.go
torrent.go

index 781ebb14c760036169bf093d1037cfffd90b0b34..74f8a0d5358c638a5e297cc95e3559d12e3ba07c 100644 (file)
--- a/client.go
+++ b/client.go
@@ -1223,6 +1223,7 @@ func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (
                maxEstablishedConns: defaultEstablishedConnsPerTorrent,
 
                networkingEnabled: true,
+               requestStrategy:   2,
        }
        t.setChunkSize(defaultChunkSize)
        return
index 774e55f1b196e2596561d1acd6a63458c39e9254..ba1d1e1e7c42e02621abeaa41f01a09d58e77885 100644 (file)
@@ -211,18 +211,16 @@ func (cn *connection) WriteStatus(w io.Writer, t *Torrent) {
                len(cn.PeerRequests),
                cn.statusFlags(),
        )
-       fmt.Fprintf(w, "    next pieces: %v\n", priorityBitmapHeadAsSlice(&cn.pieceRequestOrder, 10))
-}
-
-func priorityBitmapHeadAsSlice(pb *prioritybitmap.PriorityBitmap, n int) (ret []int) {
-       pb.IterTyped(func(i int) bool {
-               if len(ret) >= n {
-                       return false
-               }
-               ret = append(ret, i)
-               return true
-       })
-       return
+       roi := cn.pieceRequestOrderIter()
+       fmt.Fprintf(w, "    next pieces: %v%s\n",
+               iter.ToSlice(iter.Head(10, roi)),
+               func() string {
+                       if cn.shouldRequestWithoutBias() {
+                               return " (fastest)"
+                       } else {
+                               return ""
+                       }
+               }())
 }
 
 func (cn *connection) Close() {
@@ -472,22 +470,27 @@ func nextRequestState(
        networkingEnabled bool,
        currentRequests map[request]struct{},
        peerChoking bool,
-       nextPieces *prioritybitmap.PriorityBitmap,
+       requestPieces iter.Func,
        pendingChunks func(piece int, f func(chunkSpec) bool) bool,
        requestsLowWater int,
        requestsHighWater int,
 ) (
-       cancelExisting bool,
-       newRequests []request,
-       interested bool,
+       cancelExisting bool, // Cancel all our pending requests
+       newRequests []request, // Chunks to request that we currently aren't
+       interested bool, // Whether we should indicate interest, even if we don't request anything
 ) {
-       if !networkingEnabled || nextPieces.IsEmpty() {
+       if !networkingEnabled {
                return true, nil, false
        }
-       if peerChoking || len(currentRequests) > requestsLowWater {
-               return false, nil, !nextPieces.IsEmpty()
+       if len(currentRequests) > requestsLowWater {
+               return false, nil, true
        }
-       nextPieces.IterTyped(func(piece int) bool {
+       requestPieces(func(_piece interface{}) bool {
+               interested = true
+               if peerChoking {
+                       return false
+               }
+               piece := _piece.(int)
                return pendingChunks(piece, func(cs chunkSpec) bool {
                        r := request{pp.Integer(piece), cs}
                        if _, ok := currentRequests[r]; !ok {
@@ -499,19 +502,68 @@ func nextRequestState(
                        return len(currentRequests)+len(newRequests) < requestsHighWater
                })
        })
-       return false, newRequests, true
+       return
 }
 
 func (cn *connection) updateRequests() {
        cn.tickleWriter()
 }
 
+func iterBitmapsDistinct(skip bitmap.Bitmap, bms ...bitmap.Bitmap) iter.Func {
+       return func(cb iter.Callback) {
+               for _, bm := range bms {
+                       if !iter.All(func(i interface{}) bool {
+                               skip.Add(i.(int))
+                               return cb(i)
+                       }, bitmap.Sub(bm, skip).Iter) {
+                               return
+                       }
+               }
+       }
+}
+
+func (cn *connection) unbiasedPieceRequestOrder() iter.Func {
+       now, readahead := cn.t.readerPiecePriorities()
+       return iterBitmapsDistinct(cn.t.completedPieces.Copy(), now, readahead, cn.t.pendingPieces)
+}
+
+// The connection should download highest priority pieces first, without any
+// inclination toward avoiding wastage. Generally we might do this if there's
+// a single connection, or this is the fastest connection, and we have active
+// readers that signal an ordering preference. It's conceivable that the best
+// connection should do this, since it's least likely to waste our time if
+// assigned to the highest priority pieces, and assigning more than one this
+// role would cause significant wasted bandwidth.
+func (cn *connection) shouldRequestWithoutBias() bool {
+       if cn.t.requestStrategy != 2 {
+               return false
+       }
+       if len(cn.t.readers) == 0 {
+               return false
+       }
+       if len(cn.t.conns) == 1 {
+               return true
+       }
+       if cn == cn.t.fastestConn {
+               return true
+       }
+       return false
+}
+
+func (cn *connection) pieceRequestOrderIter() iter.Func {
+       if cn.shouldRequestWithoutBias() {
+               return cn.unbiasedPieceRequestOrder()
+       } else {
+               return cn.pieceRequestOrder.Iter
+       }
+}
+
 func (cn *connection) desiredRequestState() (bool, []request, bool) {
        return nextRequestState(
                cn.t.networkingEnabled,
                cn.requests,
                cn.PeerChoked,
-               &cn.pieceRequestOrder,
+               cn.pieceRequestOrderIter(),
                func(piece int, f func(chunkSpec) bool) bool {
                        return undirtiedChunks(piece, cn.t, f)
                },
@@ -545,16 +597,20 @@ func (cn *connection) updatePiecePriority(piece int) bool {
                return cn.stopRequestingPiece(piece)
        }
        prio := cn.getPieceInclination()[piece]
-       switch tpp {
-       case PiecePriorityNormal:
-       case PiecePriorityReadahead:
-               prio -= cn.t.numPieces()
-       case PiecePriorityNext, PiecePriorityNow:
-               prio -= 2 * cn.t.numPieces()
+       switch cn.t.requestStrategy {
+       case 1:
+               switch tpp {
+               case PiecePriorityNormal:
+               case PiecePriorityReadahead:
+                       prio -= cn.t.numPieces()
+               case PiecePriorityNext, PiecePriorityNow:
+                       prio -= 2 * cn.t.numPieces()
+               default:
+                       panic(tpp)
+               }
+               prio += piece / 3
        default:
-               panic(tpp)
        }
-       prio += piece / 3
        return cn.pieceRequestOrder.Set(piece, prio)
 }
 
@@ -966,6 +1022,10 @@ func (c *connection) receiveChunk(msg *pp.Message) {
 
        c.UsefulChunksReceived++
        c.lastUsefulChunkReceived = time.Now()
+       if t.fastestConn != c {
+               // log.Printf("setting fastest connection %p", c)
+       }
+       t.fastestConn = c
 
        // Need to record that it hasn't been written yet, before we attempt to do
        // anything with it.
index 17ee31338e36ee500682759b7de1eb5e7379919c..8bb23c35365dfe20e19446b695e67f951e4d4cae 100644 (file)
@@ -3,6 +3,8 @@ package torrent
 import (
        "testing"
 
+       "github.com/anacrolix/missinggo/bitmap"
+       "github.com/anacrolix/missinggo/iter"
        "github.com/stretchr/testify/assert"
 )
 
@@ -17,3 +19,12 @@ func TestTorrentOffsetRequest(t *testing.T) {
        check(13, 5, 11, newRequest(2, 0, 3), true)
        check(13, 5, 13, request{}, false)
 }
+
+func TestIterBitmapsDistinct(t *testing.T) {
+       var skip, first, second bitmap.Bitmap
+       skip.Add(1)
+       first.Add(1, 0, 3)
+       second.Add(1, 2, 0)
+       assert.Equal(t, []interface{}{0, 3, 2}, iter.ToSlice(iterBitmapsDistinct(skip.Copy(), first, second)))
+       assert.Equal(t, []int{1}, skip.ToSortedSlice())
+}
index cb838ef4e1ae3c8c451b5a906f499e3363e6cc18..c09a4d0b3eb8016c052941039712eb936f6732e4 100644 (file)
@@ -45,6 +45,7 @@ type Torrent struct {
        cl *Client
 
        networkingEnabled bool
+       requestStrategy   int
 
        closed   missinggo.Event
        infoHash metainfo.Hash
@@ -68,12 +69,14 @@ type Torrent struct {
 
        // The info dict. nil if we don't have it (yet).
        info *metainfo.Info
+
        // Active peer connections, running message stream loops.
        conns               map[*connection]struct{}
        maxEstablishedConns int
        // Set of addrs to which we're attempting to connect. Connections are
        // half-open until all handshakes are completed.
-       halfOpen map[string]Peer
+       halfOpen    map[string]Peer
+       fastestConn *connection
 
        // Reserve of peers to connect to. A peer can be both here and in the
        // active connections if were told about the peer after connecting with