]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Rework to improve work stealing and try to thread peers through all request pieces
authorMatt Joiner <anacrolix@gmail.com>
Wed, 12 May 2021 07:45:36 +0000 (17:45 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 7 Jun 2021 03:01:39 +0000 (13:01 +1000)
misc.go
peerconn.go
piece.go
request-strategy.go

diff --git a/misc.go b/misc.go
index 5ad01d8594b7ab037fbc080516d61fdf7ab4e3c2..0cd85a3f0d58aedbb79844f9197599bf7e951a4d 100644 (file)
--- a/misc.go
+++ b/misc.go
@@ -151,6 +151,16 @@ func min(as ...int64) int64 {
        return ret
 }
 
+func minInt(as ...int) int {
+       ret := as[0]
+       for _, a := range as[1:] {
+               if a < ret {
+                       ret = a
+               }
+       }
+       return ret
+}
+
 var unlimited = rate.NewLimiter(rate.Inf, 0)
 
 type (
index 7e20ba3bf19ac7fc4478eb868b1c1c09ab13023c..1721663c56ec3fbcd469a2ccff7e8093018deae8 100644 (file)
@@ -173,17 +173,18 @@ func (cn *Peer) expectingChunks() bool {
        if !cn.interested {
                return false
        }
-       if !cn.peerChoking {
-               return true
-       }
        for r := range cn.requests {
-               if cn.peerAllowedFast.Contains(bitmap.BitIndex(r.Index)) {
+               if !cn.remoteChokingPiece(r.Index.Int()) {
                        return true
                }
        }
        return false
 }
 
+func (cn *Peer) remoteChokingPiece(piece pieceIndex) bool {
+       return cn.peerChoking && !cn.peerAllowedFast.Contains(bitmap.BitIndex(piece))
+}
+
 // Returns true if the connection is over IPv6.
 func (cn *PeerConn) ipv6() bool {
        ip := cn.remoteIp()
@@ -542,13 +543,7 @@ func (pc *PeerConn) writeInterested(interested bool) bool {
 // are okay.
 type messageWriter func(pp.Message) bool
 
-func (cn *Peer) request(r Request) error {
-       if _, ok := cn.requests[r]; ok {
-               return nil
-       }
-       if cn.numLocalRequests() >= cn.nominalMaxRequests() {
-               return errors.New("too many outstanding requests")
-       }
+func (cn *Peer) shouldRequest(r Request) error {
        if !cn.peerHasPiece(pieceIndex(r.Index)) {
                return errors.New("requesting piece peer doesn't have")
        }
@@ -564,15 +559,18 @@ func (cn *Peer) request(r Request) error {
        if cn.t.pieceQueuedForHash(pieceIndex(r.Index)) {
                panic("piece is queued for hash")
        }
-       if !cn.setInterested(true) {
-               return errors.New("write buffer full after expressing interest")
+       return nil
+}
+
+func (cn *Peer) request(r Request) error {
+       if err := cn.shouldRequest(r); err != nil {
+               panic(err)
        }
-       if cn.peerChoking {
-               if cn.peerAllowedFast.Get(int(r.Index)) {
-                       torrent.Add("allowed fast requests sent", 1)
-               } else {
-                       errors.New("peer choking and piece not in allowed fast set")
-               }
+       if _, ok := cn.requests[r]; ok {
+               return nil
+       }
+       if cn.numLocalRequests() >= cn.nominalMaxRequests() {
+               return errors.New("too many outstanding requests")
        }
        if cn.requests == nil {
                cn.requests = make(map[Request]struct{})
index cee6b7d01afc235eec3af73a56d0fe07843b2f76..af9607c4cd10ac6ea011d60018e6865242208dcb 100644 (file)
--- a/piece.go
+++ b/piece.go
@@ -222,10 +222,7 @@ func (p *Piece) SetPriority(prio piecePriority) {
        p.t.updatePiecePriority(p.index)
 }
 
-func (p *Piece) uncachedPriority() (ret piecePriority) {
-       if p.t.pieceComplete(p.index) || p.t.pieceQueuedForHash(p.index) || p.t.hashingPiece(p.index) {
-               return PiecePriorityNone
-       }
+func (p *Piece) purePriority() (ret piecePriority) {
        for _, f := range p.files {
                ret.Raise(f.prio)
        }
@@ -242,6 +239,13 @@ func (p *Piece) uncachedPriority() (ret piecePriority) {
        return
 }
 
+func (p *Piece) uncachedPriority() (ret piecePriority) {
+       if p.t.pieceComplete(p.index) || p.t.pieceQueuedForHash(p.index) || p.t.hashingPiece(p.index) {
+               return PiecePriorityNone
+       }
+       return p.purePriority()
+}
+
 // Tells the Client to refetch the completion status from storage, updating priority etc. if
 // necessary. Might be useful if you know the state of the piece data has changed externally.
 func (p *Piece) UpdateCompletion() {
index 1f906510d4dc7a5c9501d0b4e4313b66bcfffe0e..5daa7de13c00397126838a91795e40ebb69fe46f 100644 (file)
@@ -20,45 +20,17 @@ type pieceRequestOrderPiece struct {
        prio         piecePriority
        partial      bool
        availability int64
-}
-
-func (me *clientPieceRequestOrder) addPieces(t *Torrent, numPieces pieceIndex) {
-       for i := range iter.N(numPieces) {
-               me.pieces = append(me.pieces, pieceRequestOrderPiece{
-                       t:     t,
-                       index: i,
-               })
-       }
+       request      bool
 }
 
 func (me *clientPieceRequestOrder) Len() int {
        return len(me.pieces)
 }
 
-func (me *clientPieceRequestOrder) removePieces(t *Torrent) {
-       newPieces := make([]pieceRequestOrderPiece, 0, len(me.pieces)-t.numPieces())
-       for _, p := range me.pieces {
-               if p.t != t {
-                       newPieces = append(newPieces, p)
-               }
-       }
-       me.pieces = newPieces
-}
-
 func (me clientPieceRequestOrder) sort() {
        sort.Slice(me.pieces, me.less)
 }
 
-func (me *clientPieceRequestOrder) update() {
-       for i := range me.pieces {
-               p := &me.pieces[i]
-               tp := p.t.piece(p.index)
-               p.prio = tp.uncachedPriority()
-               p.partial = p.t.piecePartiallyDownloaded(p.index)
-               p.availability = tp.availability
-       }
-}
-
 func (me clientPieceRequestOrder) less(_i, _j int) bool {
        i := me.pieces[_i]
        j := me.pieces[_j]
@@ -66,9 +38,7 @@ func (me clientPieceRequestOrder) less(_i, _j int) bool {
                int(j.prio), int(i.prio),
        ).Bool(
                j.partial, i.partial,
-       ).Int64(
-               i.availability, j.availability,
-       ).Less()
+       ).Int64(i.availability, j.availability).Int(i.index, j.index).Less()
 }
 
 func (cl *Client) requester() {
@@ -86,45 +56,111 @@ func (cl *Client) requester() {
        }
 }
 
+type requestsPeer struct {
+       cur                        *Peer
+       nextRequests               map[Request]struct{}
+       nextInterest               bool
+       requestablePiecesRemaining int
+}
+
+func (rp *requestsPeer) canRequestPiece(p pieceIndex) bool {
+       return rp.hasPiece(p) && (!rp.choking() || rp.pieceAllowedFast(p))
+}
+
+func (rp *requestsPeer) hasPiece(i pieceIndex) bool {
+       return rp.cur.peerHasPiece(i)
+}
+
+func (rp *requestsPeer) pieceAllowedFast(p pieceIndex) bool {
+       return rp.cur.peerAllowedFast.Contains(p)
+}
+
+func (rp *requestsPeer) choking() bool {
+       return rp.cur.peerChoking
+}
+
+func (rp *requestsPeer) hasExistingRequest(r Request) bool {
+       _, ok := rp.cur.requests[r]
+       return ok
+}
+
+func (rp *requestsPeer) canFitRequest() bool {
+       return len(rp.nextRequests) < rp.cur.nominalMaxRequests()
+}
+
+// Returns true if it is added and wasn't there before.
+func (rp *requestsPeer) addNextRequest(r Request) bool {
+       _, ok := rp.nextRequests[r]
+       if ok {
+               return false
+       }
+       rp.nextRequests[r] = struct{}{}
+       return true
+}
+
+type peersForPieceRequests struct {
+       requestsInPiece int
+       *requestsPeer
+}
+
+func (me *peersForPieceRequests) addNextRequest(r Request) {
+       if me.requestsPeer.addNextRequest(r) {
+               return
+               me.requestsInPiece++
+       }
+}
+
 func (cl *Client) doRequests() {
        requestOrder := &cl.pieceRequestOrder
        requestOrder.pieces = requestOrder.pieces[:0]
-       allPeers := make(map[*Torrent][]*Peer)
+       allPeers := make(map[*Torrent][]*requestsPeer)
        // Storage capacity left for this run, keyed by the storage capacity pointer on the storage
        // TorrentImpl.
        storageLeft := make(map[*func() *int64]*int64)
        for _, t := range cl.torrents {
                // TODO: We could do metainfo requests here.
-               if t.haveInfo() {
-                       key := t.storage.Capacity
-                       if key != nil {
-                               if _, ok := storageLeft[key]; !ok {
-                                       storageLeft[key] = (*key)()
-                               }
+               if !t.haveInfo() {
+                       continue
+               }
+               key := t.storage.Capacity
+               if key != nil {
+                       if _, ok := storageLeft[key]; !ok {
+                               storageLeft[key] = (*key)()
                        }
-                       requestOrder.addPieces(t, t.numPieces())
                }
-               var peers []*Peer
+               var peers []*requestsPeer
                t.iterPeers(func(p *Peer) {
                        if !p.closed.IsSet() {
-                               peers = append(peers, p)
+                               peers = append(peers, &requestsPeer{
+                                       cur:          p,
+                                       nextRequests: make(map[Request]struct{}),
+                               })
                        }
                })
-               // Sort in *desc* order, approximately the reverse of worseConn where appropriate.
-               sort.Slice(peers, func(i, j int) bool {
-                       return multiless.New().Float64(
-                               peers[j].downloadRate(), peers[i].downloadRate(),
-                       ).Uintptr(
-                               uintptr(unsafe.Pointer(peers[j])), uintptr(unsafe.Pointer(peers[i]))).Less()
-               })
+               for i := range iter.N(t.numPieces()) {
+                       tp := t.piece(i)
+                       pp := tp.purePriority()
+                       request := !t.ignorePieceForRequests(i)
+                       requestOrder.pieces = append(requestOrder.pieces, pieceRequestOrderPiece{
+                               t:            t,
+                               index:        i,
+                               prio:         pp,
+                               partial:      t.piecePartiallyDownloaded(i),
+                               availability: tp.availability,
+                               request:      request,
+                       })
+                       if request {
+                               for _, p := range peers {
+                                       if p.canRequestPiece(i) {
+                                               p.requestablePiecesRemaining++
+                                       }
+                               }
+                       }
+               }
                allPeers[t] = peers
        }
-       requestOrder.update()
        requestOrder.sort()
-       // For a given piece, the set of allPeers indices that absorbed requests for the piece.
-       contributed := make(map[int]struct{})
        for _, p := range requestOrder.pieces {
-               peers := allPeers[p.t]
                torrentPiece := p.t.piece(p.index)
                if left := storageLeft[p.t.storage.Capacity]; left != nil {
                        if *left < int64(torrentPiece.length()) {
@@ -132,53 +168,104 @@ func (cl *Client) doRequests() {
                        }
                        *left -= int64(torrentPiece.length())
                }
-               if p.t.ignorePieceForRequests(p.index) {
+               if !p.request {
                        continue
                }
-               p.t.piece(p.index).iterUndirtiedChunks(func(chunk ChunkSpec) bool {
+               peersForPiece := make([]*peersForPieceRequests, 0, len(allPeers[p.t]))
+               for _, peer := range allPeers[p.t] {
+                       peersForPiece = append(peersForPiece, &peersForPieceRequests{
+                               requestsInPiece: 0,
+                               requestsPeer:    peer,
+                       })
+               }
+               sortPeersForPiece := func() {
+                       sort.Slice(peersForPiece, func(i, j int) bool {
+                               return multiless.New().Bool(
+                                       peersForPiece[j].canFitRequest(),
+                                       peersForPiece[i].canFitRequest(),
+                               ).Int(
+                                       peersForPiece[i].requestsInPiece,
+                                       peersForPiece[j].requestsInPiece,
+                               ).Int(
+                                       peersForPiece[i].requestablePiecesRemaining,
+                                       peersForPiece[j].requestablePiecesRemaining,
+                               ).Float64(
+                                       peersForPiece[j].cur.downloadRate(),
+                                       peersForPiece[i].cur.downloadRate(),
+                               ).EagerSameLess(
+                                       peersForPiece[i].cur.completedHandshake.Equal(peersForPiece[j].cur.completedHandshake),
+                                       peersForPiece[i].cur.completedHandshake.Before(peersForPiece[j].cur.completedHandshake),
+                                       // TODO: Probably peer priority can come next
+                               ).Uintptr(
+                                       uintptr(unsafe.Pointer(peersForPiece[j].cur)),
+                                       uintptr(unsafe.Pointer(peersForPiece[i].cur)),
+                               ).Less()
+                       })
+               }
+               pendingChunksRemaining := int(p.t.pieceNumPendingChunks(p.index))
+               torrentPiece.iterUndirtiedChunks(func(chunk ChunkSpec) bool {
                        req := Request{pp.Integer(p.index), chunk}
-                       const skipAlreadyRequested = false
-                       if skipAlreadyRequested {
-                               alreadyRequested := false
-                               p.t.iterPeers(func(p *Peer) {
-                                       if _, ok := p.requests[req]; ok {
-                                               alreadyRequested = true
-                                       }
-                               })
-                               if alreadyRequested {
+                       pendingChunksRemaining--
+                       sortPeersForPiece()
+                       for i, peer := range peersForPiece {
+                               if i > pendingChunksRemaining {
+                                       break
+                               }
+                               if peer.hasExistingRequest(req) && peer.canFitRequest() {
+                                       peer.addNextRequest(req)
                                        return true
                                }
                        }
-                       alreadyRequested := false
-                       for peerIndex, peer := range peers {
-                               if alreadyRequested {
-                                       // Cancel all requests from "slower" peers after the one that requested it.
-                                       peer.cancel(req)
-                               } else {
-                                       err := peer.request(req)
-                                       if err == nil {
-                                               contributed[peerIndex] = struct{}{}
-                                               alreadyRequested = true
-                                               //log.Printf("requested %v", req)
+                       for _, peer := range peersForPiece {
+                               if !peer.canFitRequest() {
+                                       continue
+                               }
+                               if !peer.hasPiece(p.index) {
+                                       continue
+                               }
+                               if !peer.pieceAllowedFast(p.index) {
+                                       // TODO: Verify that's okay to stay uninterested if we request allowed fast
+                                       // pieces.
+                                       peer.nextInterest = true
+                                       if peer.choking() {
+                                               continue
                                        }
                                }
+                               peer.addNextRequest(req)
+                               return true
                        }
                        return true
                })
-               // Move requestees for this piece to the back.
-               lastIndex := len(peers) - 1
-               // Probably should sort the contributees, to make the ordering more deterministic.
-               for peerIndex := range contributed {
-                       peers[peerIndex], peers[lastIndex] = peers[lastIndex], peers[peerIndex]
-                       delete(contributed, peerIndex)
-                       lastIndex--
+               for _, peer := range peersForPiece {
+                       if peer.canRequestPiece(p.index) {
+                               peer.requestablePiecesRemaining--
+                       }
                }
        }
-       for _, t := range cl.torrents {
-               t.iterPeers(func(p *Peer) {
-                       if !p.peerChoking && p.numLocalRequests() == 0 && !p.writeBufferFull() {
-                               p.setInterested(false)
+       for _, peers := range allPeers {
+               for _, rp := range peers {
+                       if rp.requestablePiecesRemaining != 0 {
+                               panic(rp.requestablePiecesRemaining)
                        }
-               })
+                       applyPeerNextRequests(rp)
+               }
+       }
+}
+
+func applyPeerNextRequests(rp *requestsPeer) {
+       p := rp.cur
+       p.setInterested(rp.nextInterest)
+       for req := range p.requests {
+               if _, ok := rp.nextRequests[req]; !ok {
+                       p.cancel(req)
+               }
+       }
+       for req := range rp.nextRequests {
+               err := p.request(req)
+               if err != nil {
+                       panic(err)
+               } else {
+                       //log.Print(req)
+               }
        }
 }