From 09823b9e361741cfc88a5ce088294ec6253a17d3 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Wed, 12 May 2021 17:45:36 +1000 Subject: [PATCH] Rework to improve work stealing and try to thread peers through all request pieces --- misc.go | 10 ++ peerconn.go | 36 +++--- piece.go | 12 +- request-strategy.go | 261 +++++++++++++++++++++++++++++--------------- 4 files changed, 209 insertions(+), 110 deletions(-) diff --git a/misc.go b/misc.go index 5ad01d85..0cd85a3f 100644 --- a/misc.go +++ b/misc.go @@ -151,6 +151,16 @@ func min(as ...int64) int64 { return ret } +func minInt(as ...int) int { + ret := as[0] + for _, a := range as[1:] { + if a < ret { + ret = a + } + } + return ret +} + var unlimited = rate.NewLimiter(rate.Inf, 0) type ( diff --git a/peerconn.go b/peerconn.go index 7e20ba3b..1721663c 100644 --- a/peerconn.go +++ b/peerconn.go @@ -173,17 +173,18 @@ func (cn *Peer) expectingChunks() bool { if !cn.interested { return false } - if !cn.peerChoking { - return true - } for r := range cn.requests { - if cn.peerAllowedFast.Contains(bitmap.BitIndex(r.Index)) { + if !cn.remoteChokingPiece(r.Index.Int()) { return true } } return false } +func (cn *Peer) remoteChokingPiece(piece pieceIndex) bool { + return cn.peerChoking && !cn.peerAllowedFast.Contains(bitmap.BitIndex(piece)) +} + // Returns true if the connection is over IPv6. func (cn *PeerConn) ipv6() bool { ip := cn.remoteIp() @@ -542,13 +543,7 @@ func (pc *PeerConn) writeInterested(interested bool) bool { // are okay. type messageWriter func(pp.Message) bool -func (cn *Peer) request(r Request) error { - if _, ok := cn.requests[r]; ok { - return nil - } - if cn.numLocalRequests() >= cn.nominalMaxRequests() { - return errors.New("too many outstanding requests") - } +func (cn *Peer) shouldRequest(r Request) error { if !cn.peerHasPiece(pieceIndex(r.Index)) { return errors.New("requesting piece peer doesn't have") } @@ -564,15 +559,18 @@ func (cn *Peer) request(r Request) error { if cn.t.pieceQueuedForHash(pieceIndex(r.Index)) { panic("piece is queued for hash") } - if !cn.setInterested(true) { - return errors.New("write buffer full after expressing interest") + return nil +} + +func (cn *Peer) request(r Request) error { + if err := cn.shouldRequest(r); err != nil { + panic(err) } - if cn.peerChoking { - if cn.peerAllowedFast.Get(int(r.Index)) { - torrent.Add("allowed fast requests sent", 1) - } else { - errors.New("peer choking and piece not in allowed fast set") - } + if _, ok := cn.requests[r]; ok { + return nil + } + if cn.numLocalRequests() >= cn.nominalMaxRequests() { + return errors.New("too many outstanding requests") } if cn.requests == nil { cn.requests = make(map[Request]struct{}) diff --git a/piece.go b/piece.go index cee6b7d0..af9607c4 100644 --- a/piece.go +++ b/piece.go @@ -222,10 +222,7 @@ func (p *Piece) SetPriority(prio piecePriority) { p.t.updatePiecePriority(p.index) } -func (p *Piece) uncachedPriority() (ret piecePriority) { - if p.t.pieceComplete(p.index) || p.t.pieceQueuedForHash(p.index) || p.t.hashingPiece(p.index) { - return PiecePriorityNone - } +func (p *Piece) purePriority() (ret piecePriority) { for _, f := range p.files { ret.Raise(f.prio) } @@ -242,6 +239,13 @@ func (p *Piece) uncachedPriority() (ret piecePriority) { return } +func (p *Piece) uncachedPriority() (ret piecePriority) { + if p.t.pieceComplete(p.index) || p.t.pieceQueuedForHash(p.index) || p.t.hashingPiece(p.index) { + return PiecePriorityNone + } + return p.purePriority() +} + // Tells the Client to refetch the completion status from storage, updating priority etc. if // necessary. Might be useful if you know the state of the piece data has changed externally. func (p *Piece) UpdateCompletion() { diff --git a/request-strategy.go b/request-strategy.go index 1f906510..5daa7de1 100644 --- a/request-strategy.go +++ b/request-strategy.go @@ -20,45 +20,17 @@ type pieceRequestOrderPiece struct { prio piecePriority partial bool availability int64 -} - -func (me *clientPieceRequestOrder) addPieces(t *Torrent, numPieces pieceIndex) { - for i := range iter.N(numPieces) { - me.pieces = append(me.pieces, pieceRequestOrderPiece{ - t: t, - index: i, - }) - } + request bool } func (me *clientPieceRequestOrder) Len() int { return len(me.pieces) } -func (me *clientPieceRequestOrder) removePieces(t *Torrent) { - newPieces := make([]pieceRequestOrderPiece, 0, len(me.pieces)-t.numPieces()) - for _, p := range me.pieces { - if p.t != t { - newPieces = append(newPieces, p) - } - } - me.pieces = newPieces -} - func (me clientPieceRequestOrder) sort() { sort.Slice(me.pieces, me.less) } -func (me *clientPieceRequestOrder) update() { - for i := range me.pieces { - p := &me.pieces[i] - tp := p.t.piece(p.index) - p.prio = tp.uncachedPriority() - p.partial = p.t.piecePartiallyDownloaded(p.index) - p.availability = tp.availability - } -} - func (me clientPieceRequestOrder) less(_i, _j int) bool { i := me.pieces[_i] j := me.pieces[_j] @@ -66,9 +38,7 @@ func (me clientPieceRequestOrder) less(_i, _j int) bool { int(j.prio), int(i.prio), ).Bool( j.partial, i.partial, - ).Int64( - i.availability, j.availability, - ).Less() + ).Int64(i.availability, j.availability).Int(i.index, j.index).Less() } func (cl *Client) requester() { @@ -86,45 +56,111 @@ func (cl *Client) requester() { } } +type requestsPeer struct { + cur *Peer + nextRequests map[Request]struct{} + nextInterest bool + requestablePiecesRemaining int +} + +func (rp *requestsPeer) canRequestPiece(p pieceIndex) bool { + return rp.hasPiece(p) && (!rp.choking() || rp.pieceAllowedFast(p)) +} + +func (rp *requestsPeer) hasPiece(i pieceIndex) bool { + return rp.cur.peerHasPiece(i) +} + +func (rp *requestsPeer) pieceAllowedFast(p pieceIndex) bool { + return rp.cur.peerAllowedFast.Contains(p) +} + +func (rp *requestsPeer) choking() bool { + return rp.cur.peerChoking +} + +func (rp *requestsPeer) hasExistingRequest(r Request) bool { + _, ok := rp.cur.requests[r] + return ok +} + +func (rp *requestsPeer) canFitRequest() bool { + return len(rp.nextRequests) < rp.cur.nominalMaxRequests() +} + +// Returns true if it is added and wasn't there before. +func (rp *requestsPeer) addNextRequest(r Request) bool { + _, ok := rp.nextRequests[r] + if ok { + return false + } + rp.nextRequests[r] = struct{}{} + return true +} + +type peersForPieceRequests struct { + requestsInPiece int + *requestsPeer +} + +func (me *peersForPieceRequests) addNextRequest(r Request) { + if me.requestsPeer.addNextRequest(r) { + return + me.requestsInPiece++ + } +} + func (cl *Client) doRequests() { requestOrder := &cl.pieceRequestOrder requestOrder.pieces = requestOrder.pieces[:0] - allPeers := make(map[*Torrent][]*Peer) + allPeers := make(map[*Torrent][]*requestsPeer) // Storage capacity left for this run, keyed by the storage capacity pointer on the storage // TorrentImpl. storageLeft := make(map[*func() *int64]*int64) for _, t := range cl.torrents { // TODO: We could do metainfo requests here. - if t.haveInfo() { - key := t.storage.Capacity - if key != nil { - if _, ok := storageLeft[key]; !ok { - storageLeft[key] = (*key)() - } + if !t.haveInfo() { + continue + } + key := t.storage.Capacity + if key != nil { + if _, ok := storageLeft[key]; !ok { + storageLeft[key] = (*key)() } - requestOrder.addPieces(t, t.numPieces()) } - var peers []*Peer + var peers []*requestsPeer t.iterPeers(func(p *Peer) { if !p.closed.IsSet() { - peers = append(peers, p) + peers = append(peers, &requestsPeer{ + cur: p, + nextRequests: make(map[Request]struct{}), + }) } }) - // Sort in *desc* order, approximately the reverse of worseConn where appropriate. - sort.Slice(peers, func(i, j int) bool { - return multiless.New().Float64( - peers[j].downloadRate(), peers[i].downloadRate(), - ).Uintptr( - uintptr(unsafe.Pointer(peers[j])), uintptr(unsafe.Pointer(peers[i]))).Less() - }) + for i := range iter.N(t.numPieces()) { + tp := t.piece(i) + pp := tp.purePriority() + request := !t.ignorePieceForRequests(i) + requestOrder.pieces = append(requestOrder.pieces, pieceRequestOrderPiece{ + t: t, + index: i, + prio: pp, + partial: t.piecePartiallyDownloaded(i), + availability: tp.availability, + request: request, + }) + if request { + for _, p := range peers { + if p.canRequestPiece(i) { + p.requestablePiecesRemaining++ + } + } + } + } allPeers[t] = peers } - requestOrder.update() requestOrder.sort() - // For a given piece, the set of allPeers indices that absorbed requests for the piece. - contributed := make(map[int]struct{}) for _, p := range requestOrder.pieces { - peers := allPeers[p.t] torrentPiece := p.t.piece(p.index) if left := storageLeft[p.t.storage.Capacity]; left != nil { if *left < int64(torrentPiece.length()) { @@ -132,53 +168,104 @@ func (cl *Client) doRequests() { } *left -= int64(torrentPiece.length()) } - if p.t.ignorePieceForRequests(p.index) { + if !p.request { continue } - p.t.piece(p.index).iterUndirtiedChunks(func(chunk ChunkSpec) bool { + peersForPiece := make([]*peersForPieceRequests, 0, len(allPeers[p.t])) + for _, peer := range allPeers[p.t] { + peersForPiece = append(peersForPiece, &peersForPieceRequests{ + requestsInPiece: 0, + requestsPeer: peer, + }) + } + sortPeersForPiece := func() { + sort.Slice(peersForPiece, func(i, j int) bool { + return multiless.New().Bool( + peersForPiece[j].canFitRequest(), + peersForPiece[i].canFitRequest(), + ).Int( + peersForPiece[i].requestsInPiece, + peersForPiece[j].requestsInPiece, + ).Int( + peersForPiece[i].requestablePiecesRemaining, + peersForPiece[j].requestablePiecesRemaining, + ).Float64( + peersForPiece[j].cur.downloadRate(), + peersForPiece[i].cur.downloadRate(), + ).EagerSameLess( + peersForPiece[i].cur.completedHandshake.Equal(peersForPiece[j].cur.completedHandshake), + peersForPiece[i].cur.completedHandshake.Before(peersForPiece[j].cur.completedHandshake), + // TODO: Probably peer priority can come next + ).Uintptr( + uintptr(unsafe.Pointer(peersForPiece[j].cur)), + uintptr(unsafe.Pointer(peersForPiece[i].cur)), + ).Less() + }) + } + pendingChunksRemaining := int(p.t.pieceNumPendingChunks(p.index)) + torrentPiece.iterUndirtiedChunks(func(chunk ChunkSpec) bool { req := Request{pp.Integer(p.index), chunk} - const skipAlreadyRequested = false - if skipAlreadyRequested { - alreadyRequested := false - p.t.iterPeers(func(p *Peer) { - if _, ok := p.requests[req]; ok { - alreadyRequested = true - } - }) - if alreadyRequested { + pendingChunksRemaining-- + sortPeersForPiece() + for i, peer := range peersForPiece { + if i > pendingChunksRemaining { + break + } + if peer.hasExistingRequest(req) && peer.canFitRequest() { + peer.addNextRequest(req) return true } } - alreadyRequested := false - for peerIndex, peer := range peers { - if alreadyRequested { - // Cancel all requests from "slower" peers after the one that requested it. - peer.cancel(req) - } else { - err := peer.request(req) - if err == nil { - contributed[peerIndex] = struct{}{} - alreadyRequested = true - //log.Printf("requested %v", req) + for _, peer := range peersForPiece { + if !peer.canFitRequest() { + continue + } + if !peer.hasPiece(p.index) { + continue + } + if !peer.pieceAllowedFast(p.index) { + // TODO: Verify that's okay to stay uninterested if we request allowed fast + // pieces. + peer.nextInterest = true + if peer.choking() { + continue } } + peer.addNextRequest(req) + return true } return true }) - // Move requestees for this piece to the back. - lastIndex := len(peers) - 1 - // Probably should sort the contributees, to make the ordering more deterministic. - for peerIndex := range contributed { - peers[peerIndex], peers[lastIndex] = peers[lastIndex], peers[peerIndex] - delete(contributed, peerIndex) - lastIndex-- + for _, peer := range peersForPiece { + if peer.canRequestPiece(p.index) { + peer.requestablePiecesRemaining-- + } } } - for _, t := range cl.torrents { - t.iterPeers(func(p *Peer) { - if !p.peerChoking && p.numLocalRequests() == 0 && !p.writeBufferFull() { - p.setInterested(false) + for _, peers := range allPeers { + for _, rp := range peers { + if rp.requestablePiecesRemaining != 0 { + panic(rp.requestablePiecesRemaining) } - }) + applyPeerNextRequests(rp) + } + } +} + +func applyPeerNextRequests(rp *requestsPeer) { + p := rp.cur + p.setInterested(rp.nextInterest) + for req := range p.requests { + if _, ok := rp.nextRequests[req]; !ok { + p.cancel(req) + } + } + for req := range rp.nextRequests { + err := p.request(req) + if err != nil { + panic(err) + } else { + //log.Print(req) + } } } -- 2.48.1