]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Do requests synchronously, and don't request from hashing or queued pieces
authorMatt Joiner <anacrolix@gmail.com>
Tue, 26 Jun 2018 04:51:55 +0000 (14:51 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Tue, 26 Jun 2018 04:51:55 +0000 (14:51 +1000)
Calculating the desired state was a nice idea, but too hard to debug. This way should also be faster.

connection.go
global.go
piece.go
torrent.go

index cea8dab68b254285aea3eca8142e23d733d79d65..24da542bbd77e40893eeab0a5bfddfd333e56ad4 100644 (file)
@@ -303,9 +303,8 @@ func (cn *connection) WriteStatus(w io.Writer, t *Torrent) {
                cn.statusFlags(),
                cn.downloadRate()/(1<<10),
        )
-       roi := cn.pieceRequestOrderIter()
        fmt.Fprintf(w, "    next pieces: %v%s\n",
-               iter.ToSlice(iter.Head(10, roi)),
+               iter.ToSlice(iter.Head(10, cn.iterPendingPiecesUntyped)),
                func() string {
                        if cn.shouldRequestWithoutBias() {
                                return " (fastest)"
@@ -524,33 +523,55 @@ func (cn *connection) request(r request, mw messageWriter) bool {
 }
 
 func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) {
-       numFillBuffers.Add(1)
-       cancel, new, i := cn.desiredRequestState()
-       if !cn.SetInterested(i, msg) {
-               return
-       }
-       if cancel && len(cn.requests) != 0 {
-               fillBufferSentCancels.Add(1)
-               for r := range cn.requests {
-                       cn.deleteRequest(r)
-                       // log.Printf("%p: cancelling request: %v", cn, r)
-                       if !msg(makeCancelMessage(r)) {
-                               return
+       if !cn.t.networkingEnabled {
+               if !cn.SetInterested(false, msg) {
+                       return
+               }
+               if len(cn.requests) != 0 {
+                       for r := range cn.requests {
+                               cn.deleteRequest(r)
+                               // log.Printf("%p: cancelling request: %v", cn, r)
+                               if !msg(makeCancelMessage(r)) {
+                                       return
+                               }
                        }
                }
        }
-       if len(new) != 0 {
-               fillBufferSentRequests.Add(1)
-               for _, r := range new {
-                       if !cn.request(r, msg) {
-                               // If we didn't completely top up the requests, we shouldn't
-                               // mark the low water, since we'll want to top up the requests
-                               // as soon as we have more write buffer space.
-                               return
-                       }
+       if len(cn.requests) <= cn.requestsLowWater {
+               filledBuffer := false
+               cn.iterPendingPieces(func(pieceIndex int) bool {
+                       cn.iterPendingRequests(pieceIndex, func(r request) bool {
+                               if !cn.SetInterested(true, msg) {
+                                       filledBuffer = true
+                                       return false
+                               }
+                               if len(cn.requests) >= cn.nominalMaxRequests() {
+                                       return false
+                               }
+                               // Choking is looked at here because our interest is dependent
+                               // on whether we'd make requests in its absence.
+                               if cn.PeerChoked {
+                                       if !cn.peerAllowedFast.Get(bitmap.BitIndex(r.Index)) {
+                                               return false
+                                       }
+                               }
+                               if _, ok := cn.requests[r]; ok {
+                                       return true
+                               }
+                               filledBuffer = !cn.request(r, msg)
+                               return !filledBuffer
+                       })
+                       return !filledBuffer
+               })
+               if filledBuffer {
+                       // If we didn't completely top up the requests, we shouldn't mark
+                       // the low water, since we'll want to top up the requests as soon
+                       // as we have more write buffer space.
+                       return
                }
                cn.requestsLowWater = len(cn.requests) / 2
        }
+
        cn.upload(msg)
 }
 
@@ -639,53 +660,6 @@ func (cn *connection) PostBitfield() {
        cn.sentHaves = cn.t.completedPieces.Copy()
 }
 
-// Determines interest and requests to send to a connected peer.
-func nextRequestState(
-       networkingEnabled bool,
-       currentRequests map[request]struct{},
-       peerChoking bool,
-       iterPendingRequests func(f func(request) bool),
-       requestsLowWater int,
-       requestsHighWater int,
-       allowedFast bitmap.Bitmap,
-) (
-       cancelExisting bool, // Cancel all our pending requests
-       newRequests []request, // Chunks to request that we currently aren't
-       interested bool, // Whether we should indicate interest, even if we don't request anything
-) {
-       if !networkingEnabled {
-               return true, nil, false
-       }
-       if len(currentRequests) > requestsLowWater {
-               return false, nil, true
-       }
-       // If we have existing requests, better maintain interest to ensure we get
-       // them. iterPendingRequests might not iterate over outstanding requests.
-       interested = len(currentRequests) != 0
-       iterPendingRequests(func(r request) bool {
-               interested = true
-               if peerChoking {
-                       if allowedFast.IsEmpty() {
-                               return false
-                       }
-                       if !allowedFast.Get(int(r.Index)) {
-                               return true
-                       }
-               }
-               if len(currentRequests)+len(newRequests) >= requestsHighWater {
-                       return false
-               }
-               if _, ok := currentRequests[r]; !ok {
-                       if newRequests == nil {
-                               newRequests = make([]request, 0, requestsHighWater-len(currentRequests))
-                       }
-                       newRequests = append(newRequests, r)
-               }
-               return true
-       })
-       return
-}
-
 func (cn *connection) updateRequests() {
        // log.Print("update requests")
        cn.tickleWriter()
@@ -707,18 +681,26 @@ func iterBitmapsDistinct(skip *bitmap.Bitmap, bms ...bitmap.Bitmap) iter.Func {
        }
 }
 
-func (cn *connection) unbiasedPieceRequestOrder() iter.Func {
+func (cn *connection) iterUnbiasedPieceRequestOrder(f func(piece int) bool) bool {
        now, readahead := cn.t.readerPiecePriorities()
        var skip bitmap.Bitmap
        if !cn.peerSentHaveAll {
-               // Pieces to skip include pieces the peer doesn't have
+               // Pieces to skip include pieces the peer doesn't have.
                skip = bitmap.Flip(cn.peerPieces, 0, cn.t.numPieces())
        }
        // And pieces that we already have.
        skip.Union(cn.t.completedPieces)
+       skip.Union(cn.t.piecesQueuedForHash)
        // Return an iterator over the different priority classes, minus the skip
        // pieces.
-       return iter.Chain(
+       return iter.All(
+               func(_piece interface{}) bool {
+                       i := _piece.(pieceIndex)
+                       if cn.t.hashingPiece(i) {
+                               return true
+                       }
+                       return f(i)
+               },
                iterBitmapsDistinct(&skip, now, readahead),
                func(cb iter.Callback) {
                        cn.t.pendingPieces.IterTyped(func(piece int) bool {
@@ -756,49 +738,38 @@ func (cn *connection) shouldRequestWithoutBias() bool {
        return false
 }
 
-func (cn *connection) pieceRequestOrderIter() iter.Func {
+func (cn *connection) iterPendingPieces(f func(int) bool) bool {
        if cn.t.requestStrategy == 3 {
-               return cn.unbiasedPieceRequestOrder()
+               return cn.iterUnbiasedPieceRequestOrder(f)
        }
        if cn.shouldRequestWithoutBias() {
-               return cn.unbiasedPieceRequestOrder()
+               return cn.iterUnbiasedPieceRequestOrder(f)
        } else {
-               return cn.pieceRequestOrder.Iter
-       }
-}
-
-func (cn *connection) iterPendingRequests(f func(request) bool) {
-       cn.pieceRequestOrderIter()(func(_piece interface{}) bool {
-               piece := _piece.(int)
-               return iterUndirtiedChunks(piece, cn.t, func(cs chunkSpec) bool {
-                       r := request{pp.Integer(piece), cs}
-                       if cn.t.requestStrategy == 3 {
-                               lr := cn.t.lastRequested[r]
-                               if !lr.IsZero() {
-                                       if time.Since(lr) < cn.t.duplicateRequestTimeout {
-                                               return true
-                                       } else {
-                                               torrent.Add("requests duplicated due to timeout", 1)
-                                       }
+               return cn.pieceRequestOrder.IterTyped(f)
+       }
+}
+
+func (cn *connection) iterPendingPiecesUntyped(f iter.Callback) {
+       cn.iterPendingPieces(func(i int) bool { return f(i) })
+}
+
+func (cn *connection) iterPendingRequests(piece int, f func(request) bool) bool {
+       return iterUndirtiedChunks(piece, cn.t, func(cs chunkSpec) bool {
+               r := request{pp.Integer(piece), cs}
+               if cn.t.requestStrategy == 3 {
+                       lr := cn.t.lastRequested[r]
+                       if !lr.IsZero() {
+                               if time.Since(lr) < cn.t.duplicateRequestTimeout {
+                                       return true
+                               } else {
+                                       torrent.Add("requests duplicated due to timeout", 1)
                                }
                        }
-                       return f(r)
-               })
+               }
+               return f(r)
        })
 }
 
-func (cn *connection) desiredRequestState() (bool, []request, bool) {
-       return nextRequestState(
-               cn.t.networkingEnabled,
-               cn.requests,
-               cn.PeerChoked,
-               cn.iterPendingRequests,
-               cn.requestsLowWater,
-               cn.nominalMaxRequests(),
-               cn.peerAllowedFast,
-       )
-}
-
 func iterUndirtiedChunks(piece int, t *Torrent, f func(chunkSpec) bool) bool {
        chunkIndices := t.pieces[piece].undirtiedChunkIndices().ToSortedSlice()
        // TODO: Use "math/rand".Shuffle >= Go 1.10
index 43b84ee192c2b00678bbdfedec2b9e5b05acada4..bbec4b5f93965e0df4db45c61548af0412a5a799 100644 (file)
--- a/global.go
+++ b/global.go
@@ -49,8 +49,4 @@ var (
        pieceInclinationsReused = expvar.NewInt("pieceInclinationsReused")
        pieceInclinationsNew    = expvar.NewInt("pieceInclinationsNew")
        pieceInclinationsPut    = expvar.NewInt("pieceInclinationsPut")
-
-       fillBufferSentCancels  = expvar.NewInt("fillBufferSentCancels")
-       fillBufferSentRequests = expvar.NewInt("fillBufferSentRequests")
-       numFillBuffers         = expvar.NewInt("numFillBuffers")
 )
index d29d1dc88accf89e0ee7023e30c07a1bc27cc071..5e0c82a80f5c96a0a855ea4960d490d47e6760aa 100644 (file)
--- a/piece.go
+++ b/piece.go
@@ -215,7 +215,7 @@ func (p *Piece) SetPriority(prio piecePriority) {
 }
 
 func (p *Piece) uncachedPriority() (ret piecePriority) {
-       if p.t.pieceComplete(p.index) {
+       if p.t.pieceComplete(p.index) || p.t.pieceQueuedForHash(p.index) || p.t.hashingPiece(p.index) {
                return PiecePriorityNone
        }
        for _, f := range p.files {
index 5736dee5ff7ca891180a499b308014835d9fcf23..159139792fbe17455982a765bd86aba199658cb3 100644 (file)
@@ -1675,7 +1675,7 @@ func (t *Torrent) onIncompletePiece(piece int) {
        }
 }
 
-func (t *Torrent) verifyPiece(piece int) {
+func (t *Torrent) verifyPiece(piece pieceIndex) {
        cl := t.cl
        cl.mu.Lock()
        defer cl.mu.Unlock()
@@ -1690,18 +1690,20 @@ func (t *Torrent) verifyPiece(piece int) {
        if !p.t.piecesQueuedForHash.Remove(piece) {
                panic("piece was not queued")
        }
+       t.updatePiecePriority(piece)
        if t.closed.IsSet() || t.pieceComplete(piece) {
-               t.updatePiecePriority(piece)
                return
        }
        p.hashing = true
        t.publishPieceChange(piece)
+       t.updatePiecePriority(piece)
        t.storageLock.RLock()
        cl.mu.Unlock()
        sum := t.hashPiece(piece)
        t.storageLock.RUnlock()
        cl.mu.Lock()
        p.hashing = false
+       t.updatePiecePriority(piece)
        t.pieceHashed(piece, sum == p.hash)
        t.publishPieceChange(piece)
 }
@@ -1732,6 +1734,7 @@ func (t *Torrent) queuePieceCheck(pieceIndex int) {
        }
        t.piecesQueuedForHash.Add(pieceIndex)
        t.publishPieceChange(pieceIndex)
+       t.updatePiecePriority(pieceIndex)
        go t.verifyPiece(pieceIndex)
 }