From: Matt Joiner Date: Sun, 12 Dec 2021 07:38:33 +0000 (+1100) Subject: Merge branch 'request-strategy-experiments' X-Git-Tag: v1.39.0 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=3d3052b8bddd8b3df9485c13ff639f47fb527f9d;hp=8e99558e116eea2832166c862bf9be85c674a240;p=btrtrc.git Merge branch 'request-strategy-experiments' This should bring in significant performance improvements that fix issues with the peer-requesting that existed from v1.34.0. --- diff --git a/client.go b/client.go index c491118b..9d0777e4 100644 --- a/client.go +++ b/client.go @@ -27,6 +27,7 @@ import ( "github.com/anacrolix/missinggo/v2/bitmap" "github.com/anacrolix/missinggo/v2/pproffd" "github.com/anacrolix/sync" + request_strategy "github.com/anacrolix/torrent/request-strategy" "github.com/davecgh/go-spew/spew" "github.com/dustin/go-humanize" "github.com/google/btree" @@ -74,6 +75,7 @@ type Client struct { dopplegangerAddrs map[string]struct{} badPeerIPs map[string]struct{} torrents map[InfoHash]*Torrent + pieceRequestOrder map[interface{}]*request_strategy.PieceRequestOrder acceptLimiter map[ipStr]int dialRateLimiter *rate.Limiter diff --git a/peer-impl.go b/peer-impl.go index f7140377..47b4345a 100644 --- a/peer-impl.go +++ b/peer-impl.go @@ -15,11 +15,10 @@ type peerImpl interface { isLowOnRequests() bool writeInterested(interested bool) bool - // Neither of these return buffer room anymore, because they're currently both posted. There's - // also PeerConn.writeBufferFull for when/where it matters. - _cancel(RequestIndex) bool + // _cancel initiates cancellation of a request and returns acked if it expects the cancel to be + // handled by a follow-up event. + _cancel(RequestIndex) (acked bool) _request(Request) bool - connectionFlags() string onClose() onGotInfo(*metainfo.Info) diff --git a/peerconn.go b/peerconn.go index fc3d9df3..098d93fc 100644 --- a/peerconn.go +++ b/peerconn.go @@ -52,7 +52,7 @@ type PeerRemoteAddr interface { // indexable with the memory space available. type ( maxRequests = int - requestState = request_strategy.PeerNextRequestState + requestState = request_strategy.PeerRequestState ) type Peer struct { @@ -84,9 +84,8 @@ type Peer struct { // Stuff controlled by the local peer. needRequestUpdate string - actualRequestState requestState + requestState requestState updateRequestsTimer *time.Timer - cancelledRequests roaring.Bitmap lastBecameInterested time.Time priorInterest time.Duration @@ -97,9 +96,9 @@ type Peer struct { choking bool piecesReceivedSinceLastRequestUpdate maxRequests maxPiecesReceivedBetweenRequestUpdates maxRequests - // Chunks that we might reasonably expect to receive from the peer. Due to - // latency, buffering, and implementation differences, we may receive - // chunks that are no longer in the set of requests actually want. + // Chunks that we might reasonably expect to receive from the peer. Due to latency, buffering, + // and implementation differences, we may receive chunks that are no longer in the set of + // requests actually want. This could use a roaring.BSI if the memory use becomes noticeable. validReceiveChunks map[RequestIndex]int // Indexed by metadata piece, set to true if posted and pending a // response. @@ -177,10 +176,10 @@ func (cn *Peer) updateExpectingChunks() { } func (cn *Peer) expectingChunks() bool { - if cn.actualRequestState.Requests.IsEmpty() { + if cn.requestState.Requests.IsEmpty() { return false } - if !cn.actualRequestState.Interested { + if !cn.requestState.Interested { return false } if !cn.peerChoking { @@ -189,7 +188,7 @@ func (cn *Peer) expectingChunks() bool { haveAllowedFastRequests := false cn.peerAllowedFast.Iterate(func(i uint32) bool { haveAllowedFastRequests = roaringBitmapRangeCardinality( - &cn.actualRequestState.Requests, + &cn.requestState.Requests, cn.t.pieceRequestIndexOffset(pieceIndex(i)), cn.t.pieceRequestIndexOffset(pieceIndex(i+1)), ) == 0 @@ -230,7 +229,7 @@ func (l *PeerConn) hasPreferredNetworkOver(r *PeerConn) (left, ok bool) { func (cn *Peer) cumInterest() time.Duration { ret := cn.priorInterest - if cn.actualRequestState.Interested { + if cn.requestState.Interested { ret += time.Since(cn.lastBecameInterested) } return ret @@ -318,7 +317,7 @@ func (cn *Peer) statusFlags() (ret string) { c := func(b byte) { ret += string([]byte{b}) } - if cn.actualRequestState.Interested { + if cn.requestState.Interested { c('i') } if cn.choking { @@ -346,7 +345,7 @@ func (cn *Peer) downloadRate() float64 { func (cn *Peer) numRequestsByPiece() (ret map[pieceIndex]int) { ret = make(map[pieceIndex]int) - cn.actualRequestState.Requests.Iterate(func(x uint32) bool { + cn.requestState.Requests.Iterate(func(x uint32) bool { ret[pieceIndex(x/cn.t.chunksPerRegularPiece())]++ return true }) @@ -373,14 +372,14 @@ func (cn *Peer) writeStatus(w io.Writer, t *Torrent) { cn.totalExpectingTime(), ) fmt.Fprintf(w, - " %s completed, %d pieces touched, good chunks: %v/%v:%v reqq: %d-%v/(%d/%d):%d/%d, flags: %s, dr: %.1f KiB/s\n", + " %s completed, %d pieces touched, good chunks: %v/%v:%v reqq: %d+%v/(%d/%d):%d/%d, flags: %s, dr: %.1f KiB/s\n", cn.completedString(), len(cn.peerTouchedPieces), &cn._stats.ChunksReadUseful, &cn._stats.ChunksRead, &cn._stats.ChunksWritten, - cn.actualRequestState.Requests.GetCardinality(), - cn.cancelledRequests.GetCardinality(), + cn.requestState.Requests.GetCardinality(), + cn.requestState.Cancelled.GetCardinality(), cn.nominalMaxRequests(), cn.PeerMaxRequests, len(cn.peerRequests), @@ -537,10 +536,10 @@ func (cn *PeerConn) unchoke(msg func(pp.Message) bool) bool { } func (cn *Peer) setInterested(interested bool) bool { - if cn.actualRequestState.Interested == interested { + if cn.requestState.Interested == interested { return true } - cn.actualRequestState.Interested = interested + cn.requestState.Interested = interested if interested { cn.lastBecameInterested = time.Now() } else if !cn.lastBecameInterested.IsZero() { @@ -589,7 +588,7 @@ func (cn *Peer) shouldRequest(r RequestIndex) error { if cn.peerChoking && !cn.peerAllowedFast.Contains(bitmap.BitIndex(pi)) { // This could occur if we made a request with the fast extension, and then got choked and // haven't had the request rejected yet. - if !cn.actualRequestState.Requests.Contains(r) { + if !cn.requestState.Requests.Contains(r) { panic("peer choking and piece not allowed fast") } } @@ -608,18 +607,19 @@ func (cn *Peer) request(r RequestIndex) (more bool, err error) { if err := cn.shouldRequest(r); err != nil { panic(err) } - if cn.actualRequestState.Requests.Contains(r) { + if cn.requestState.Requests.Contains(r) { return true, nil } - if maxRequests(cn.actualRequestState.Requests.GetCardinality()) >= cn.nominalMaxRequests() { + if maxRequests(cn.requestState.Requests.GetCardinality()) >= cn.nominalMaxRequests() { return true, errors.New("too many outstanding requests") } - cn.actualRequestState.Requests.Add(r) + cn.requestState.Requests.Add(r) if cn.validReceiveChunks == nil { cn.validReceiveChunks = make(map[RequestIndex]int) } cn.validReceiveChunks[r]++ - cn.t.pendingRequests.Inc(r) + cn.t.pendingRequests[r] = cn + cn.t.lastRequested[r] = time.Now() cn.updateExpectingChunks() ppReq := cn.t.requestIndexToRequest(r) for _, f := range cn.callbacks.SentRequest { @@ -637,31 +637,25 @@ func (me *PeerConn) _request(r Request) bool { }) } -func (me *Peer) cancel(r RequestIndex) bool { - if !me.actualRequestState.Requests.Contains(r) { - return true +func (me *Peer) cancel(r RequestIndex) { + if !me.deleteRequest(r) { + panic("request not existing should have been guarded") + } + if me._cancel(r) { + if !me.requestState.Cancelled.CheckedAdd(r) { + panic("request already cancelled") + } + } + if me.isLowOnRequests() { + me.updateRequests("Peer.cancel") } - return me._cancel(r) } func (me *PeerConn) _cancel(r RequestIndex) bool { - if me.cancelledRequests.Contains(r) { - // Already cancelled and waiting for a response. - return true - } + me.write(makeCancelMessage(me.t.requestIndexToRequest(r))) // Transmission does not send rejects for received cancels. See // https://github.com/transmission/transmission/pull/2275. - if me.fastEnabled() && !me.remoteIsTransmission() { - me.cancelledRequests.Add(r) - } else { - if !me.deleteRequest(r) { - panic("request not existing should have been guarded") - } - if me.isLowOnRequests() { - me.updateRequests("Peer.cancel") - } - } - return me.write(makeCancelMessage(me.t.requestIndexToRequest(r))) + return me.fastEnabled() && !me.remoteIsTransmission() } func (cn *PeerConn) fillWriteBuffer() { @@ -1101,18 +1095,13 @@ func (c *PeerConn) mainReadLoop() (err error) { c.deleteAllRequests() } else { // We don't decrement pending requests here, let's wait for the peer to either - // reject or satisfy the outstanding requests. Additionally some peers may unchoke + // reject or satisfy the outstanding requests. Additionally, some peers may unchoke // us and resume where they left off, we don't want to have piled on to those chunks - // in the meanwhile. I think a peers ability to abuse this should be limited: they + // in the meanwhile. I think a peer's ability to abuse this should be limited: they // could let us request a lot of stuff, then choke us and never reject, but they're // only a single peer, our chunk balancing should smooth over this abuse. } c.peerChoking = true - // We can now reset our interest. I think we do this after setting the flag in case the - // peerImpl updates synchronously (webseeds?). - if !c.actualRequestState.Requests.IsEmpty() { - c.updateRequests("choked") - } c.updateExpectingChunks() case pp.Unchoke: if !c.peerChoking { @@ -1123,7 +1112,7 @@ func (c *PeerConn) mainReadLoop() (err error) { } c.peerChoking = false preservedCount := 0 - c.actualRequestState.Requests.Iterate(func(x uint32) bool { + c.requestState.Requests.Iterate(func(x uint32) bool { if !c.peerAllowedFast.Contains(x / c.t.chunksPerRegularPiece()) { preservedCount++ } @@ -1193,7 +1182,11 @@ func (c *PeerConn) mainReadLoop() (err error) { case pp.HaveNone: err = c.peerSentHaveNone() case pp.Reject: - c.remoteRejectedRequest(c.t.requestIndexFromRequest(newRequestFromMessage(&msg))) + req := newRequestFromMessage(&msg) + if !c.remoteRejectedRequest(c.t.requestIndexFromRequest(req)) { + log.Printf("received invalid reject [request=%v, peer=%v]", req, c) + err = fmt.Errorf("received invalid reject [request=%v]", req) + } case pp.AllowedFast: torrent.Add("allowed fasts received", 1) log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c).SetLevel(log.Debug).Log(c.t.logger) @@ -1209,13 +1202,16 @@ func (c *PeerConn) mainReadLoop() (err error) { } } -func (c *Peer) remoteRejectedRequest(r RequestIndex) { - if c.deleteRequest(r) { - if c.isLowOnRequests() { - c.updateRequests("Peer.remoteRejectedRequest") - } - c.decExpectedChunkReceive(r) +// Returns true if it was valid to reject the request. +func (c *Peer) remoteRejectedRequest(r RequestIndex) bool { + if !c.deleteRequest(r) && !c.requestState.Cancelled.CheckedRemove(r) { + return false } + if c.isLowOnRequests() { + c.updateRequests("Peer.remoteRejectedRequest") + } + c.decExpectedChunkReceive(r) + return true } func (c *Peer) decExpectedChunkReceive(r RequestIndex) { @@ -1341,16 +1337,16 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { // The request needs to be deleted immediately to prevent cancels occurring asynchronously when // have actually already received the piece, while we have the Client unlocked to write the data // out. - deletedRequest := false + intended := false { - if c.actualRequestState.Requests.Contains(req) { + if c.requestState.Requests.Contains(req) { for _, f := range c.callbacks.ReceivedRequested { f(PeerMessageEvent{c, msg}) } } // Request has been satisfied. - if c.deleteRequest(req) { - deletedRequest = true + if c.deleteRequest(req) || c.requestState.Cancelled.CheckedRemove(req) { + intended = true if !c.peerChoking { c._chunksReceivedWhileExpecting++ } @@ -1358,7 +1354,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { c.updateRequests("Peer.receiveChunk deleted request") } } else { - chunksReceived.Add("unwanted", 1) + chunksReceived.Add("unintended", 1) } } @@ -1368,7 +1364,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { // Do we actually want this chunk? if t.haveChunk(ppReq) { // panic(fmt.Sprintf("%+v", ppReq)) - chunksReceived.Add("wasted", 1) + chunksReceived.Add("redundant", 1) c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadWasted })) return nil } @@ -1377,7 +1373,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful })) c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData })) - if deletedRequest { + if intended { c.piecesReceivedSinceLastRequestUpdate++ c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulIntendedData })) } @@ -1394,12 +1390,12 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { piece.unpendChunkIndex(chunkIndexFromChunkSpec(ppReq.ChunkSpec, t.chunkSize)) // Cancel pending requests for this chunk from *other* peers. - t.iterPeers(func(p *Peer) { + if p := t.pendingRequests[req]; p != nil { if p == c { - return + panic("should not be pending request from conn that just received it") } p.cancel(req) - }) + } err := func() error { cl.unlock() @@ -1540,26 +1536,33 @@ func (c *Peer) peerHasWantedPieces() bool { return c.peerPieces().Intersects(&c.t._pendingPieces) } +// Returns true if an outstanding request is removed. Cancelled requests should be handled +// separately. func (c *Peer) deleteRequest(r RequestIndex) bool { - if !c.actualRequestState.Requests.CheckedRemove(r) { + if !c.requestState.Requests.CheckedRemove(r) { return false } - c.cancelledRequests.Remove(r) for _, f := range c.callbacks.DeletedRequest { f(PeerRequestEvent{c, c.t.requestIndexToRequest(r)}) } c.updateExpectingChunks() - c.t.pendingRequests.Dec(r) + if c.t.requestingPeer(r) != c { + panic("only one peer should have a given request at a time") + } + delete(c.t.pendingRequests, r) + delete(c.t.lastRequested, r) return true } func (c *Peer) deleteAllRequests() { - c.actualRequestState.Requests.Clone().Iterate(func(x uint32) bool { - c.deleteRequest(x) + c.requestState.Requests.Clone().Iterate(func(x uint32) bool { + if !c.deleteRequest(x) { + panic("request should exist") + } return true }) - if !c.actualRequestState.Requests.IsEmpty() { - panic(c.actualRequestState.Requests.GetCardinality()) + if !c.requestState.Requests.IsEmpty() { + panic(c.requestState.Requests.GetCardinality()) } } @@ -1689,7 +1692,11 @@ func (p *Peer) TryAsPeerConn() (*PeerConn, bool) { } func (pc *PeerConn) isLowOnRequests() bool { - return pc.actualRequestState.Requests.IsEmpty() + return pc.requestState.Requests.IsEmpty() && pc.requestState.Cancelled.IsEmpty() +} + +func (p *Peer) uncancelledRequests() uint64 { + return p.requestState.Requests.GetCardinality() } func (pc *PeerConn) remoteIsTransmission() bool { diff --git a/piece.go b/piece.go index bef5f59e..6caa7628 100644 --- a/piece.go +++ b/piece.go @@ -77,11 +77,13 @@ func (p *Piece) numDirtyChunks() chunkIndexType { func (p *Piece) unpendChunkIndex(i chunkIndexType) { p.t.dirtyChunks.Add(p.requestIndexOffset() + i) + p.t.updatePieceRequestOrder(p.index) p.readerCond.Broadcast() } func (p *Piece) pendChunkIndex(i RequestIndex) { p.t.dirtyChunks.Remove(p.requestIndexOffset() + i) + p.t.updatePieceRequestOrder(p.index) } func (p *Piece) numChunks() chunkIndexType { diff --git a/request-strategy-impls.go b/request-strategy-impls.go new file mode 100644 index 00000000..f4c12646 --- /dev/null +++ b/request-strategy-impls.go @@ -0,0 +1,74 @@ +package torrent + +import ( + "github.com/anacrolix/torrent/metainfo" + request_strategy "github.com/anacrolix/torrent/request-strategy" + "github.com/anacrolix/torrent/storage" +) + +type requestStrategyInput struct { + cl *Client + capFunc storage.TorrentCapacity +} + +func (r requestStrategyInput) Torrent(ih metainfo.Hash) request_strategy.Torrent { + return requestStrategyTorrent{r.cl.torrents[ih]} +} + +func (r requestStrategyInput) Capacity() (int64, bool) { + if r.capFunc == nil { + return 0, false + } + return (*r.capFunc)() +} + +func (r requestStrategyInput) MaxUnverifiedBytes() int64 { + return r.cl.config.MaxUnverifiedBytes +} + +var _ request_strategy.Input = requestStrategyInput{} + +// Returns what is necessary to run request_strategy.GetRequestablePieces for primaryTorrent. +func (cl *Client) getRequestStrategyInput(primaryTorrent *Torrent) (input request_strategy.Input) { + return requestStrategyInput{ + cl: cl, + capFunc: primaryTorrent.storage.Capacity, + } +} + +func (t *Torrent) getRequestStrategyInput() request_strategy.Input { + return t.cl.getRequestStrategyInput(t) +} + +type requestStrategyTorrent struct { + t *Torrent +} + +func (r requestStrategyTorrent) Piece(i int) request_strategy.Piece { + return requestStrategyPiece{r.t, i} +} + +func (r requestStrategyTorrent) ChunksPerPiece() uint32 { + return r.t.chunksPerRegularPiece() +} + +func (r requestStrategyTorrent) PieceLength() int64 { + return r.t.info.PieceLength +} + +var _ request_strategy.Torrent = requestStrategyTorrent{} + +type requestStrategyPiece struct { + t *Torrent + i pieceIndex +} + +func (r requestStrategyPiece) Request() bool { + return !r.t.ignorePieceForRequests(r.i) +} + +func (r requestStrategyPiece) NumPendingChunks() int { + return int(r.t.pieceNumPendingChunks(r.i)) +} + +var _ request_strategy.Piece = requestStrategyPiece{} diff --git a/request-strategy/order.go b/request-strategy/order.go index 9fb45002..0fa51698 100644 --- a/request-strategy/order.go +++ b/request-strategy/order.go @@ -2,10 +2,11 @@ package request_strategy import ( "bytes" - "sort" - "sync" + "expvar" "github.com/anacrolix/multiless" + "github.com/anacrolix/torrent/metainfo" + "github.com/google/btree" "github.com/anacrolix/torrent/types" ) @@ -20,333 +21,71 @@ type ( ChunkSpec = types.ChunkSpec ) -type ClientPieceOrder struct{} - -type filterTorrent struct { - *Torrent - unverifiedBytes int64 -} - -func sortFilterPieces(pieces []filterPiece) { - sort.Slice(pieces, func(_i, _j int) bool { - i := &pieces[_i] - j := &pieces[_j] - return multiless.New().Int( - int(j.Priority), int(i.Priority), - ).Bool( - j.Partial, i.Partial, - ).Int64( - i.Availability, j.Availability, - ).Int( - i.index, j.index, - ).Lazy(func() multiless.Computation { - return multiless.New().Cmp(bytes.Compare( - i.t.InfoHash[:], - j.t.InfoHash[:], - )) - }).MustLess() +type pieceOrderInput struct { + PieceRequestOrderState + PieceRequestOrderKey +} + +func pieceOrderLess(i, j pieceOrderInput) multiless.Computation { + return multiless.New().Int( + int(j.Priority), int(i.Priority), + ).Bool( + j.Partial, i.Partial, + ).Int64( + i.Availability, j.Availability, + ).Int( + i.Index, j.Index, + ).Lazy(func() multiless.Computation { + return multiless.New().Cmp(bytes.Compare( + i.InfoHash[:], + j.InfoHash[:], + )) }) } -type requestsPeer struct { - Peer - nextState PeerNextRequestState - requestablePiecesRemaining int -} - -func (rp *requestsPeer) canFitRequest() bool { - return int(rp.nextState.Requests.GetCardinality()) < rp.MaxRequests -} - -func (rp *requestsPeer) addNextRequest(r RequestIndex) { - if !rp.nextState.Requests.CheckedAdd(r) { - panic("should only add once") - } -} - -type peersForPieceRequests struct { - requestsInPiece int - *requestsPeer -} - -func (me *peersForPieceRequests) addNextRequest(r RequestIndex) { - me.requestsPeer.addNextRequest(r) - me.requestsInPiece++ -} - -type requestablePiece struct { - index pieceIndex - t *Torrent - alwaysReallocate bool - NumPendingChunks int - IterPendingChunks ChunksIterFunc -} - -func (p *requestablePiece) chunkIndexToRequestIndex(c ChunkIndex) RequestIndex { - return p.t.ChunksPerPiece*uint32(p.index) + c -} - -type filterPiece struct { - t *filterTorrent - index pieceIndex - *Piece -} +var packageExpvarMap = expvar.NewMap("request-strategy") // Calls f with requestable pieces in order. -func GetRequestablePieces(input Input, f func(t *Torrent, p *Piece, pieceIndex int)) { - maxPieces := 0 - for i := range input.Torrents { - maxPieces += len(input.Torrents[i].Pieces) - } - pieces := make([]filterPiece, 0, maxPieces) +func GetRequestablePieces(input Input, pro *PieceRequestOrder, f func(ih metainfo.Hash, pieceIndex int)) { // Storage capacity left for this run, keyed by the storage capacity pointer on the storage // TorrentImpl. A nil value means no capacity limit. var storageLeft *int64 - if input.Capacity != nil { - storageLeft = new(int64) - *storageLeft = *input.Capacity - } - for _t := range input.Torrents { - // TODO: We could do metainfo requests here. - t := &filterTorrent{ - Torrent: &input.Torrents[_t], - unverifiedBytes: 0, - } - for i := range t.Pieces { - pieces = append(pieces, filterPiece{ - t: t, - index: i, - Piece: &t.Pieces[i], - }) - } + if cap, ok := input.Capacity(); ok { + storageLeft = &cap } - sortFilterPieces(pieces) var allTorrentsUnverifiedBytes int64 - for _, piece := range pieces { - if left := storageLeft; left != nil { - if *left < piece.Length { - continue + pro.tree.Ascend(func(i btree.Item) bool { + _i := i.(*pieceRequestOrderItem) + ih := _i.key.InfoHash + var t Torrent = input.Torrent(ih) + var piece Piece = t.Piece(_i.key.Index) + pieceLength := t.PieceLength() + if storageLeft != nil { + if *storageLeft < pieceLength { + return false } - *left -= piece.Length + *storageLeft -= pieceLength } - if !piece.Request || piece.NumPendingChunks == 0 { + if !piece.Request() || piece.NumPendingChunks() == 0 { // TODO: Clarify exactly what is verified. Stuff that's being hashed should be // considered unverified and hold up further requests. - continue + return true } - if piece.t.MaxUnverifiedBytes != 0 && piece.t.unverifiedBytes+piece.Length > piece.t.MaxUnverifiedBytes { - continue + if input.MaxUnverifiedBytes() != 0 && allTorrentsUnverifiedBytes+pieceLength > input.MaxUnverifiedBytes() { + return true } - if input.MaxUnverifiedBytes != 0 && allTorrentsUnverifiedBytes+piece.Length > input.MaxUnverifiedBytes { - continue - } - piece.t.unverifiedBytes += piece.Length - allTorrentsUnverifiedBytes += piece.Length - f(piece.t.Torrent, piece.Piece, piece.index) - } + allTorrentsUnverifiedBytes += pieceLength + f(ih, _i.key.Index) + return true + }) return } -type Input struct { - // This is all torrents that share the same capacity below (or likely a single torrent if there - // is infinite capacity, since you could just run it separately for each Torrent if that's the - // case). - Torrents []Torrent - // Must not be modified. Non-nil if capacity is not infinite, meaning that pieces of torrents - // that share the same capacity key must be incorporated in piece ordering. - Capacity *int64 +type Input interface { + Torrent(metainfo.Hash) Torrent + // Storage capacity, shared among all Torrents with the same storage.TorrentCapacity pointer in + // their storage.Torrent references. + Capacity() (cap int64, capped bool) // Across all the Torrents. This might be partitioned by storage capacity key now. - MaxUnverifiedBytes int64 -} - -// Checks that a sorted peersForPiece slice makes sense. -func ensureValidSortedPeersForPieceRequests(peers *peersForPieceSorter) { - if !sort.IsSorted(peers) { - panic("not sorted") - } - peerMap := make(map[*peersForPieceRequests]struct{}, peers.Len()) - for _, p := range peers.peersForPiece { - if _, ok := peerMap[p]; ok { - panic(p) - } - peerMap[p] = struct{}{} - } -} - -var peersForPiecesPool sync.Pool - -func makePeersForPiece(cap int) []*peersForPieceRequests { - got := peersForPiecesPool.Get() - if got == nil { - return make([]*peersForPieceRequests, 0, cap) - } - return got.([]*peersForPieceRequests)[:0] -} - -type peersForPieceSorter struct { - peersForPiece []*peersForPieceRequests - req *RequestIndex - p requestablePiece -} - -func (me *peersForPieceSorter) Len() int { - return len(me.peersForPiece) -} - -func (me *peersForPieceSorter) Swap(i, j int) { - me.peersForPiece[i], me.peersForPiece[j] = me.peersForPiece[j], me.peersForPiece[i] -} - -func (me *peersForPieceSorter) Less(_i, _j int) bool { - i := me.peersForPiece[_i] - j := me.peersForPiece[_j] - req := me.req - p := &me.p - byHasRequest := func() multiless.Computation { - ml := multiless.New() - if req != nil { - iHas := i.nextState.Requests.Contains(*req) - jHas := j.nextState.Requests.Contains(*req) - ml = ml.Bool(jHas, iHas) - } - return ml - }() - ml := multiless.New() - // We always "reallocate", that is force even striping amongst peers that are either on - // the last piece they can contribute too, or for pieces marked for this behaviour. - // Striping prevents starving peers of requests, and will always re-balance to the - // fastest known peers. - if !p.alwaysReallocate { - ml = ml.Bool( - j.requestablePiecesRemaining == 1, - i.requestablePiecesRemaining == 1) - } - if p.alwaysReallocate || j.requestablePiecesRemaining == 1 { - ml = ml.Int( - i.requestsInPiece, - j.requestsInPiece) - } else { - ml = ml.AndThen(byHasRequest) - } - ml = ml.Int( - i.requestablePiecesRemaining, - j.requestablePiecesRemaining, - ).Float64( - j.DownloadRate, - i.DownloadRate, - ) - if ml.Ok() { - return ml.Less() - } - ml = ml.AndThen(byHasRequest) - return ml.Int64( - int64(j.Age), int64(i.Age), - // TODO: Probably peer priority can come next - ).Uintptr( - i.Id.Uintptr(), - j.Id.Uintptr(), - ).MustLess() -} - -func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) { - peersForPiece := makePeersForPiece(len(peers)) - for _, peer := range peers { - if !peer.canRequestPiece(p.index) { - continue - } - if !peer.canFitRequest() { - peer.requestablePiecesRemaining-- - continue - } - peersForPiece = append(peersForPiece, &peersForPieceRequests{ - requestsInPiece: 0, - requestsPeer: peer, - }) - } - defer func() { - for _, peer := range peersForPiece { - peer.requestablePiecesRemaining-- - } - peersForPiecesPool.Put(peersForPiece) - }() - peersForPieceSorter := peersForPieceSorter{ - peersForPiece: peersForPiece, - p: p, - } - sortPeersForPiece := func(req *RequestIndex) { - peersForPieceSorter.req = req - sort.Sort(&peersForPieceSorter) - // ensureValidSortedPeersForPieceRequests(&peersForPieceSorter) - } - // Chunks can be preassigned several times, if peers haven't been able to update their "actual" - // with "next" request state before another request strategy run occurs. - preallocated := make([][]*peersForPieceRequests, p.t.ChunksPerPiece) - p.IterPendingChunks(func(spec ChunkIndex) { - req := p.chunkIndexToRequestIndex(spec) - for _, peer := range peersForPiece { - if !peer.ExistingRequests.Contains(req) { - continue - } - if !peer.canFitRequest() { - continue - } - preallocated[spec] = append(preallocated[spec], peer) - peer.addNextRequest(req) - } - }) - pendingChunksRemaining := int(p.NumPendingChunks) - p.IterPendingChunks(func(chunk ChunkIndex) { - if len(preallocated[chunk]) != 0 { - return - } - req := p.chunkIndexToRequestIndex(chunk) - defer func() { pendingChunksRemaining-- }() - sortPeersForPiece(nil) - for _, peer := range peersForPiece { - if !peer.canFitRequest() { - continue - } - if !peer.PieceAllowedFast.ContainsInt(p.index) { - // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces. - peer.nextState.Interested = true - if peer.Choking { - continue - } - } - peer.addNextRequest(req) - break - } - }) -chunk: - for chunk, prePeers := range preallocated { - if len(prePeers) == 0 { - continue - } - pendingChunksRemaining-- - req := p.chunkIndexToRequestIndex(ChunkIndex(chunk)) - for _, pp := range prePeers { - pp.requestsInPiece-- - } - sortPeersForPiece(&req) - for _, pp := range prePeers { - pp.nextState.Requests.Remove(req) - } - for _, peer := range peersForPiece { - if !peer.canFitRequest() { - continue - } - if !peer.PieceAllowedFast.ContainsInt(p.index) { - // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces. - peer.nextState.Interested = true - if peer.Choking { - continue - } - } - peer.addNextRequest(req) - continue chunk - } - } - if pendingChunksRemaining != 0 { - panic(pendingChunksRemaining) - } + MaxUnverifiedBytes() int64 } diff --git a/request-strategy/order_test.go b/request-strategy/order_test.go deleted file mode 100644 index bcac41d1..00000000 --- a/request-strategy/order_test.go +++ /dev/null @@ -1,72 +0,0 @@ -package request_strategy - -import ( - "encoding/gob" - "testing" - - "github.com/RoaringBitmap/roaring" - qt "github.com/frankban/quicktest" - "github.com/google/go-cmp/cmp" -) - -func init() { - gob.Register(chunkIterRange(0)) - gob.Register(sliceChunksIter{}) -} - -type chunkIterRange ChunkIndex - -func (me chunkIterRange) Iter(f func(ChunkIndex)) { - for offset := ChunkIndex(0); offset < ChunkIndex(me); offset += 1 { - f(offset) - } -} - -type sliceChunksIter []ChunkIndex - -func chunkIter(offsets ...ChunkIndex) ChunksIter { - return sliceChunksIter(offsets) -} - -func (offsets sliceChunksIter) Iter(f func(ChunkIndex)) { - for _, offset := range offsets { - f(offset) - } -} - -func requestSetFromSlice(rs ...RequestIndex) (ret roaring.Bitmap) { - ret.AddMany(rs) - return -} - -func init() { - gob.Register(intPeerId(0)) -} - -type intPeerId int - -func (i intPeerId) Uintptr() uintptr { - return uintptr(i) -} - -var hasAllRequests = func() (all roaring.Bitmap) { - all.AddRange(0, roaring.MaxRange) - return -}() - -func checkNumRequestsAndInterest(c *qt.C, next PeerNextRequestState, num uint64, interest bool) { - addressableBm := next.Requests - c.Check(addressableBm.GetCardinality(), qt.ContentEquals, num) - c.Check(next.Interested, qt.Equals, interest) -} - -func checkResultsRequestsLen(t *testing.T, reqs roaring.Bitmap, l uint64) { - qt.Check(t, reqs.GetCardinality(), qt.Equals, l) -} - -var peerNextRequestStateChecker = qt.CmpEquals( - cmp.Transformer( - "bitmap", - func(bm roaring.Bitmap) []uint32 { - return bm.ToArray() - })) diff --git a/request-strategy/peer.go b/request-strategy/peer.go index b031d28e..6a69535f 100644 --- a/request-strategy/peer.go +++ b/request-strategy/peer.go @@ -1,37 +1,13 @@ package request_strategy import ( - "time" - "github.com/RoaringBitmap/roaring" ) -type PeerNextRequestState struct { +type PeerRequestState struct { Interested bool - Requests roaring.Bitmap -} - -type PeerId interface { - Uintptr() uintptr -} - -type Peer struct { - Pieces roaring.Bitmap - MaxRequests int - ExistingRequests roaring.Bitmap - Choking bool - PieceAllowedFast roaring.Bitmap - DownloadRate float64 - Age time.Duration - // This is passed back out at the end, so must support equality. Could be a type-param later. - Id PeerId -} - -// TODO: This might be used in more places I think. -func (p *Peer) canRequestPiece(i pieceIndex) bool { - return (!p.Choking || p.PieceAllowedFast.Contains(uint32(i))) && p.HasPiece(i) -} - -func (p *Peer) HasPiece(i pieceIndex) bool { - return p.Pieces.Contains(uint32(i)) + // Expecting + Requests roaring.Bitmap + // Cancelled and waiting response + Cancelled roaring.Bitmap } diff --git a/request-strategy/piece-request-order.go b/request-strategy/piece-request-order.go new file mode 100644 index 00000000..d906b835 --- /dev/null +++ b/request-strategy/piece-request-order.go @@ -0,0 +1,98 @@ +package request_strategy + +import ( + "fmt" + + "github.com/anacrolix/torrent/metainfo" + "github.com/google/btree" +) + +func NewPieceOrder() *PieceRequestOrder { + return &PieceRequestOrder{ + tree: btree.New(32), + keys: make(map[PieceRequestOrderKey]PieceRequestOrderState), + } +} + +type PieceRequestOrder struct { + tree *btree.BTree + keys map[PieceRequestOrderKey]PieceRequestOrderState +} + +type PieceRequestOrderKey struct { + InfoHash metainfo.Hash + Index int +} + +type PieceRequestOrderState struct { + Priority piecePriority + Partial bool + Availability int64 +} + +type pieceRequestOrderItem struct { + key PieceRequestOrderKey + state PieceRequestOrderState +} + +func (me *pieceRequestOrderItem) Less(other btree.Item) bool { + otherConcrete := other.(*pieceRequestOrderItem) + return pieceOrderLess( + pieceOrderInput{ + PieceRequestOrderState: me.state, + PieceRequestOrderKey: me.key, + }, + pieceOrderInput{ + PieceRequestOrderState: otherConcrete.state, + PieceRequestOrderKey: otherConcrete.key, + }, + ).Less() +} + +func (me *PieceRequestOrder) Add(key PieceRequestOrderKey, state PieceRequestOrderState) { + if _, ok := me.keys[key]; ok { + panic(key) + } + if me.tree.ReplaceOrInsert(&pieceRequestOrderItem{ + key: key, + state: state, + }) != nil { + panic("shouldn't already have this") + } + me.keys[key] = state +} + +func (me *PieceRequestOrder) Update(key PieceRequestOrderKey, state PieceRequestOrderState) { + item := me.existingItemForKey(key) + if item.state == state { + return + } + if me.tree.Delete(&item) == nil { + panic(fmt.Sprintf("%#v", key)) + } + item.state = state + if me.tree.ReplaceOrInsert(&item) != nil { + panic(key) + } + me.keys[key] = state +} + +func (me *PieceRequestOrder) existingItemForKey(key PieceRequestOrderKey) pieceRequestOrderItem { + return pieceRequestOrderItem{ + key: key, + state: me.keys[key], + } +} + +func (me *PieceRequestOrder) Delete(key PieceRequestOrderKey) { + item := me.existingItemForKey(key) + if me.tree.Delete(&item) == nil { + panic(key) + } + delete(me.keys, key) + // log.Printf("deleting %#v", key) +} + +func (me *PieceRequestOrder) Len() int { + return len(me.keys) +} diff --git a/request-strategy/piece.go b/request-strategy/piece.go index 8a038e67..626cc75b 100644 --- a/request-strategy/piece.go +++ b/request-strategy/piece.go @@ -6,19 +6,7 @@ type ChunksIter interface { Iter(func(ci ChunkIndex)) } -type Piece struct { - Request bool - Priority piecePriority - Partial bool - Availability int64 - Length int64 - NumPendingChunks int - IterPendingChunks ChunksIter -} - -func (p Piece) iterPendingChunksWrapper(f func(ChunkIndex)) { - i := p.IterPendingChunks - if i != nil { - i.Iter(f) - } +type Piece interface { + Request() bool + NumPendingChunks() int } diff --git a/request-strategy/torrent.go b/request-strategy/torrent.go index dbb41df3..51fc1a6c 100644 --- a/request-strategy/torrent.go +++ b/request-strategy/torrent.go @@ -1,14 +1,7 @@ package request_strategy -import ( - "github.com/anacrolix/torrent/metainfo" -) - -type Torrent struct { - Pieces []Piece - // Some value that's unique and stable between runs. - InfoHash metainfo.Hash - ChunksPerPiece uint32 - // TODO: This isn't actually configurable anywhere yet. - MaxUnverifiedBytes int64 +type Torrent interface { + Piece(int) Piece + ChunksPerPiece() uint32 + PieceLength() int64 } diff --git a/requesting.go b/requesting.go index 5ad98ea8..a3a7e1c3 100644 --- a/requesting.go +++ b/requesting.go @@ -4,7 +4,6 @@ import ( "container/heap" "context" "encoding/gob" - "math/rand" "reflect" "runtime/pprof" "time" @@ -16,60 +15,12 @@ import ( request_strategy "github.com/anacrolix/torrent/request-strategy" ) -// Returns what is necessary to run request_strategy.GetRequestablePieces for primaryTorrent. -func (cl *Client) getRequestStrategyInput(primaryTorrent *Torrent) (input request_strategy.Input) { - input.MaxUnverifiedBytes = cl.config.MaxUnverifiedBytes - if !primaryTorrent.haveInfo() { - return - } - if capFunc := primaryTorrent.storage.Capacity; capFunc != nil { - if cap, ok := (*capFunc)(); ok { - input.Capacity = &cap - } +func (t *Torrent) requestStrategyPieceOrderState(i int) request_strategy.PieceRequestOrderState { + return request_strategy.PieceRequestOrderState{ + Priority: t.piece(i).purePriority(), + Partial: t.piecePartiallyDownloaded(i), + Availability: t.piece(i).availability, } - if input.Capacity == nil { - input.Torrents = []request_strategy.Torrent{primaryTorrent.requestStrategyTorrentInput()} - return - } - input.Torrents = make([]request_strategy.Torrent, 0, len(cl.torrents)) - for _, t := range cl.torrents { - if !t.haveInfo() { - // This would be removed if metadata is handled here. Determining chunks per piece - // requires the info. If we have no info, we have no pieces too, so the end result is - // the same. - continue - } - if t.storage.Capacity != primaryTorrent.storage.Capacity { - continue - } - input.Torrents = append(input.Torrents, t.requestStrategyTorrentInput()) - } - return -} - -func (t *Torrent) getRequestStrategyInput() request_strategy.Input { - return t.cl.getRequestStrategyInput(t) -} - -func (t *Torrent) requestStrategyTorrentInput() request_strategy.Torrent { - rst := request_strategy.Torrent{ - InfoHash: t.infoHash, - ChunksPerPiece: t.chunksPerRegularPiece(), - } - rst.Pieces = make([]request_strategy.Piece, 0, len(t.pieces)) - for i := range t.pieces { - p := &t.pieces[i] - rst.Pieces = append(rst.Pieces, request_strategy.Piece{ - Request: !t.ignorePieceForRequests(i), - Priority: p.purePriority(), - Partial: t.piecePartiallyDownloaded(i), - Availability: p.availability, - Length: int64(p.length()), - NumPendingChunks: int(t.pieceNumPendingChunks(i)), - IterPendingChunks: &p.undirtiedChunksIter, - }) - } - return rst } func init() { @@ -116,9 +67,8 @@ type ( ) type peerRequests struct { - requestIndexes []RequestIndex - peer *Peer - torrentStrategyInput *request_strategy.Torrent + requestIndexes []RequestIndex + peer *Peer } func (p *peerRequests) Len() int { @@ -129,22 +79,8 @@ func (p *peerRequests) Less(i, j int) bool { leftRequest := p.requestIndexes[i] rightRequest := p.requestIndexes[j] t := p.peer.t - leftPieceIndex := leftRequest / p.torrentStrategyInput.ChunksPerPiece - rightPieceIndex := rightRequest / p.torrentStrategyInput.ChunksPerPiece - leftCurrent := p.peer.actualRequestState.Requests.Contains(leftRequest) - rightCurrent := p.peer.actualRequestState.Requests.Contains(rightRequest) - pending := func(index RequestIndex, current bool) int { - ret := t.pendingRequests.Get(index) - if current { - ret-- - } - // See https://github.com/anacrolix/torrent/issues/679 for possible issues. This should be - // resolved. - if ret < 0 { - panic(ret) - } - return ret - } + leftPieceIndex := leftRequest / t.chunksPerRegularPiece() + rightPieceIndex := rightRequest / t.chunksPerRegularPiece() ml := multiless.New() // Push requests that can't be served right now to the end. But we don't throw them away unless // there's a better alternative. This is for when we're using the fast extension and get choked @@ -155,20 +91,43 @@ func (p *peerRequests) Less(i, j int) bool { !p.peer.peerAllowedFast.Contains(rightPieceIndex), ) } + leftPeer := t.pendingRequests[leftRequest] + rightPeer := t.pendingRequests[rightRequest] + ml = ml.Bool(rightPeer == p.peer, leftPeer == p.peer) + ml = ml.Bool(rightPeer == nil, leftPeer == nil) + if ml.Ok() { + return ml.MustLess() + } + if leftPeer != nil { + // The right peer should also be set, or we'd have resolved the computation by now. + ml = ml.Uint64( + rightPeer.requestState.Requests.GetCardinality(), + leftPeer.requestState.Requests.GetCardinality(), + ) + // Could either of the lastRequested be Zero? That's what checking an existing peer is for. + leftLast := t.lastRequested[leftRequest] + rightLast := t.lastRequested[rightRequest] + if leftLast.IsZero() || rightLast.IsZero() { + panic("expected non-zero last requested times") + } + // We want the most-recently requested on the left. Clients like Transmission serve requests + // in received order, so the most recently-requested is the one that has the longest until + // it will be served and therefore is the best candidate to cancel. + ml = ml.CmpInt64(rightLast.Sub(leftLast).Nanoseconds()) + } + leftPiece := t.piece(int(leftPieceIndex)) + rightPiece := t.piece(int(rightPieceIndex)) ml = ml.Int( - pending(leftRequest, leftCurrent), - pending(rightRequest, rightCurrent)) - ml = ml.Bool(!leftCurrent, !rightCurrent) - ml = ml.Int( - -int(p.torrentStrategyInput.Pieces[leftPieceIndex].Priority), - -int(p.torrentStrategyInput.Pieces[rightPieceIndex].Priority), + // Technically we would be happy with the cached priority here, except we don't actually + // cache it anymore, and Torrent.piecePriority just does another lookup of *Piece to resolve + // the priority through Piece.purePriority, which is probably slower. + -int(leftPiece.purePriority()), + -int(rightPiece.purePriority()), ) ml = ml.Int( - int(p.torrentStrategyInput.Pieces[leftPieceIndex].Availability), - int(p.torrentStrategyInput.Pieces[rightPieceIndex].Availability)) - ml = ml.Uint32(leftPieceIndex, rightPieceIndex) - ml = ml.Uint32(leftRequest, rightRequest) - return ml.MustLess() + int(leftPiece.availability), + int(rightPiece.availability)) + return ml.Less() } func (p *peerRequests) Swap(i, j int) { @@ -187,59 +146,58 @@ func (p *peerRequests) Pop() interface{} { } type desiredRequestState struct { - Requests []RequestIndex + Requests peerRequests Interested bool } func (p *Peer) getDesiredRequestState() (desired desiredRequestState) { + if !p.t.haveInfo() { + return + } input := p.t.getRequestStrategyInput() requestHeap := peerRequests{ peer: p, } - for i := range input.Torrents { - t := &input.Torrents[i] - if t.InfoHash == p.t.infoHash { - requestHeap.torrentStrategyInput = t - break - } - } request_strategy.GetRequestablePieces( input, - func(t *request_strategy.Torrent, rsp *request_strategy.Piece, pieceIndex int) { - if t.InfoHash != p.t.infoHash { + p.t.getPieceRequestOrder(), + func(ih InfoHash, pieceIndex int) { + if ih != p.t.infoHash { return } if !p.peerHasPiece(pieceIndex) { return } allowedFast := p.peerAllowedFast.ContainsInt(pieceIndex) - rsp.IterPendingChunks.Iter(func(ci request_strategy.ChunkIndex) { + p.t.piece(pieceIndex).undirtiedChunksIter.Iter(func(ci request_strategy.ChunkIndex) { r := p.t.pieceRequestIndexOffset(pieceIndex) + ci - // if p.t.pendingRequests.Get(r) != 0 && !p.actualRequestState.Requests.Contains(r) { + // if p.t.pendingRequests.Get(r) != 0 && !p.requestState.Requests.Contains(r) { // return // } if !allowedFast { - // We must signal interest to request this + // We must signal interest to request this. TODO: We could set interested if the + // peers pieces (minus the allowed fast set) overlap with our missing pieces if + // there are any readers, or any pending pieces. desired.Interested = true // We can make or will allow sustaining a request here if we're not choked, or // have made the request previously (presumably while unchoked), and haven't had // the peer respond yet (and the request was retained because we are using the // fast extension). - if p.peerChoking && !p.actualRequestState.Requests.Contains(r) { + if p.peerChoking && !p.requestState.Requests.Contains(r) { // We can't request this right now. return } } + if p.requestState.Cancelled.Contains(r) { + // Can't re-request. + return + } requestHeap.requestIndexes = append(requestHeap.requestIndexes, r) }) }, ) p.t.assertPendingRequests() - heap.Init(&requestHeap) - for requestHeap.Len() != 0 && len(desired.Requests) < p.nominalMaxRequests() { - requestIndex := heap.Pop(&requestHeap).(RequestIndex) - desired.Requests = append(desired.Requests, requestIndex) - } + desired.Requests = requestHeap return } @@ -261,78 +219,38 @@ func (p *Peer) maybeUpdateActualRequestState() bool { // Transmit/action the request state to the peer. func (p *Peer) applyRequestState(next desiredRequestState) bool { - current := &p.actualRequestState + current := &p.requestState if !p.setInterested(next.Interested) { return false } more := true - cancel := current.Requests.Clone() - for _, ri := range next.Requests { - cancel.Remove(ri) - } - cancel.Iterate(func(req uint32) bool { - more = p.cancel(req) - return more - }) - if !more { - return false - } - shuffled := false - lastPending := 0 - for i := 0; i < len(next.Requests); i++ { - req := next.Requests[i] - if p.cancelledRequests.Contains(req) { - // Waiting for a reject or piece message, which will suitably trigger us to update our - // requests, so we can skip this one with no additional consideration. - continue - } - // The cardinality of our desired requests shouldn't exceed the max requests since it's used - // in the calculation of the requests. However, if we cancelled requests and they haven't - // been rejected or serviced yet with the fast extension enabled, we can end up with more - // extra outstanding requests. We could subtract the number of outstanding cancels from the - // next request cardinality, but peers might not like that. - if maxRequests(current.Requests.GetCardinality()) >= p.nominalMaxRequests() { - // log.Printf("not assigning all requests [desired=%v, cancelled=%v, current=%v, max=%v]", - // next.Requests.GetCardinality(), - // p.cancelledRequests.GetCardinality(), - // current.Requests.GetCardinality(), - // p.nominalMaxRequests(), - // ) - break - } - otherPending := p.t.pendingRequests.Get(next.Requests[0]) - if p.actualRequestState.Requests.Contains(next.Requests[0]) { - otherPending-- - } - if otherPending < lastPending { - // Pending should only rise. It's supposed to be the strongest ordering criteria. If it - // doesn't, our shuffling condition could be wrong. - panic(lastPending) - } - // If the request has already been requested by another peer, shuffle this and the rest of - // the requests (since according to the increasing condition, the rest of the indices - // already have an outstanding request with another peer). - if !shuffled && otherPending > 0 { - shuffleReqs := next.Requests[i:] - rand.Shuffle(len(shuffleReqs), func(i, j int) { - shuffleReqs[i], shuffleReqs[j] = shuffleReqs[j], shuffleReqs[i] - }) - // log.Printf("shuffled reqs [%v:%v]", i, len(next.Requests)) - shuffled = true - // Repeat this index - i-- - continue + requestHeap := &next.Requests + t := p.t + heap.Init(requestHeap) + for requestHeap.Len() != 0 && maxRequests(current.Requests.GetCardinality()) < p.nominalMaxRequests() { + req := heap.Pop(requestHeap).(RequestIndex) + existing := t.requestingPeer(req) + if existing != nil && existing != p { + // Don't steal from the poor. + diff := int64(current.Requests.GetCardinality()) + 1 - (int64(existing.uncancelledRequests()) - 1) + // Steal a request that leaves us with one more request than the existing peer + // connection if the stealer more recently received a chunk. + if diff > 1 || (diff == 1 && p.lastUsefulChunkReceived.Before(existing.lastUsefulChunkReceived)) { + continue + } + t.cancelRequest(req) } - more = p.mustRequest(req) if !more { break } } + // TODO: This may need to change, we might want to update even if there were no requests due to + // filtering them for being recently requested already. p.updateRequestsTimer.Stop() if more { p.needRequestUpdate = "" - if !current.Requests.IsEmpty() { + if current.Interested { p.updateRequestsTimer.Reset(3 * time.Second) } } diff --git a/torrent-piece-request-order.go b/torrent-piece-request-order.go new file mode 100644 index 00000000..7b1ef978 --- /dev/null +++ b/torrent-piece-request-order.go @@ -0,0 +1,51 @@ +package torrent + +import ( + request_strategy "github.com/anacrolix/torrent/request-strategy" +) + +func (t *Torrent) updatePieceRequestOrder(pieceIndex int) { + t.cl.pieceRequestOrder[t.clientPieceRequestOrderKey()].Update( + t.pieceRequestOrderKey(pieceIndex), + t.requestStrategyPieceOrderState(pieceIndex)) +} + +func (t *Torrent) clientPieceRequestOrderKey() interface{} { + if t.storage.Capacity == nil { + return t + } + return t.storage.Capacity +} + +func (t *Torrent) deletePieceRequestOrder() { + cpro := t.cl.pieceRequestOrder + key := t.clientPieceRequestOrderKey() + pro := cpro[key] + for i := 0; i < t.numPieces(); i++ { + pro.Delete(t.pieceRequestOrderKey(i)) + } + if pro.Len() == 0 { + delete(cpro, key) + } +} + +func (t *Torrent) initPieceRequestOrder() { + if t.cl.pieceRequestOrder == nil { + t.cl.pieceRequestOrder = make(map[interface{}]*request_strategy.PieceRequestOrder) + } + key := t.clientPieceRequestOrderKey() + cpro := t.cl.pieceRequestOrder + if cpro[key] == nil { + cpro[key] = request_strategy.NewPieceOrder() + } +} + +func (t *Torrent) addRequestOrderPiece(i int) { + t.cl.pieceRequestOrder[t.clientPieceRequestOrderKey()].Add( + t.pieceRequestOrderKey(i), + t.requestStrategyPieceOrderState(i)) +} + +func (t *Torrent) getPieceRequestOrder() *request_strategy.PieceRequestOrder { + return t.cl.pieceRequestOrder[t.clientPieceRequestOrderKey()] +} diff --git a/torrent.go b/torrent.go index 7ccf13f6..508de54c 100644 --- a/torrent.go +++ b/torrent.go @@ -27,6 +27,7 @@ import ( "github.com/anacrolix/missinggo/v2/bitmap" "github.com/anacrolix/multiless" "github.com/anacrolix/sync" + request_strategy "github.com/anacrolix/torrent/request-strategy" "github.com/davecgh/go-spew/spew" "github.com/pion/datachannel" @@ -137,7 +138,8 @@ type Torrent struct { initialPieceCheckDisabled bool // Count of each request across active connections. - pendingRequests pendingRequests + pendingRequests map[RequestIndex]*Peer + lastRequested map[RequestIndex]time.Time // Chunks we've written to since the corresponding piece was last checked. dirtyChunks roaring.Bitmap @@ -165,6 +167,7 @@ func (t *Torrent) decPieceAvailability(i pieceIndex) { panic(p.availability) } p.availability-- + t.updatePieceRequestOrder(i) } func (t *Torrent) incPieceAvailability(i pieceIndex) { @@ -172,6 +175,7 @@ func (t *Torrent) incPieceAvailability(i pieceIndex) { if t.haveInfo() { p := t.piece(i) p.availability++ + t.updatePieceRequestOrder(i) } } @@ -424,8 +428,16 @@ func (t *Torrent) setInfo(info *metainfo.Info) error { return nil } +func (t *Torrent) pieceRequestOrderKey(i int) request_strategy.PieceRequestOrderKey { + return request_strategy.PieceRequestOrderKey{ + InfoHash: t.infoHash, + Index: i, + } +} + // This seems to be all the follow-up tasks after info is set, that can't fail. func (t *Torrent) onSetInfo() { + t.initPieceRequestOrder() for i := range t.pieces { p := &t.pieces[i] // Need to add availability before updating piece completion, as that may result in conns @@ -434,6 +446,7 @@ func (t *Torrent) onSetInfo() { panic(p.availability) } p.availability = int64(t.pieceAvailabilityFromPeers(i)) + t.addRequestOrderPiece(i) t.updatePieceCompletion(pieceIndex(i)) if !t.initialPieceCheckDisabled && !p.storageCompletionOk { // t.logger.Printf("piece %s completion unknown, queueing check", p) @@ -443,7 +456,8 @@ func (t *Torrent) onSetInfo() { t.cl.event.Broadcast() close(t.gotMetainfoC) t.updateWantPeersEvent() - t.pendingRequests.Init(t.numRequests()) + t.pendingRequests = make(map[RequestIndex]*Peer) + t.lastRequested = make(map[RequestIndex]time.Time) t.tryCreateMorePieceHashers() t.iterPeers(func(p *Peer) { p.onGotInfo(t.info) @@ -816,6 +830,9 @@ func (t *Torrent) close(wg *sync.WaitGroup) (err error) { t.iterPeers(func(p *Peer) { p.close() }) + if t.storage != nil { + t.deletePieceRequestOrder() + } t.pex.Reset() t.cl.event.Broadcast() t.pieceStateChanges.Close() @@ -1082,9 +1099,9 @@ func (t *Torrent) maybeNewConns() { func (t *Torrent) piecePriorityChanged(piece pieceIndex, reason string) { if t._pendingPieces.Contains(uint32(piece)) { t.iterPeers(func(c *Peer) { - if c.actualRequestState.Interested { - return - } + // if c.requestState.Interested { + // return + // } if !c.isLowOnRequests() { return } @@ -1102,6 +1119,11 @@ func (t *Torrent) piecePriorityChanged(piece pieceIndex, reason string) { } func (t *Torrent) updatePiecePriority(piece pieceIndex, reason string) { + if !t.closed.IsSet() { + // It would be possible to filter on pure-priority changes here to avoid churning the piece + // request order. + t.updatePieceRequestOrder(piece) + } p := &t.pieces[piece] newPrio := p.uncachedPriority() // t.logger.Printf("torrent %p: piece %d: uncached priority: %v", t, piece, newPrio) @@ -1238,6 +1260,7 @@ func (t *Torrent) updatePieceCompletion(piece pieceIndex) bool { } else { t._completedPieces.Remove(x) } + p.t.updatePieceRequestOrder(piece) t.updateComplete() if complete && len(p.dirtiers) != 0 { t.logger.Printf("marked piece %v complete but still has dirtiers", piece) @@ -1397,7 +1420,7 @@ func (t *Torrent) assertPendingRequests() { // actual.m = make([]int, t.numRequests()) // } // t.iterPeers(func(p *Peer) { - // p.actualRequestState.Requests.Iterate(func(x uint32) bool { + // p.requestState.Requests.Iterate(func(x uint32) bool { // actual.Inc(x) // return true // }) @@ -2274,3 +2297,16 @@ func (t *Torrent) pieceRequestIndexOffset(piece pieceIndex) RequestIndex { func (t *Torrent) updateComplete() { t.Complete.SetBool(t.haveAllPieces()) } + +func (t *Torrent) cancelRequest(r RequestIndex) *Peer { + p := t.pendingRequests[r] + if p != nil { + p.cancel(r) + } + delete(t.pendingRequests, r) + return p +} + +func (t *Torrent) requestingPeer(r RequestIndex) *Peer { + return t.pendingRequests[r] +} diff --git a/webseed-peer.go b/webseed-peer.go index 85e12119..221aa53f 100644 --- a/webseed-peer.go +++ b/webseed-peer.go @@ -49,17 +49,13 @@ func (ws *webseedPeer) writeInterested(interested bool) bool { } func (ws *webseedPeer) _cancel(r RequestIndex) bool { - active, ok := ws.activeRequests[ws.peer.t.requestIndexToRequest(r)] - if ok { + if active, ok := ws.activeRequests[ws.peer.t.requestIndexToRequest(r)]; ok { active.Cancel() + // The requester is running and will handle the result. + return true } - if !ws.peer.deleteRequest(r) { - panic("cancelled webseed request should exist") - } - if ws.peer.isLowOnRequests() { - ws.peer.updateRequests("webseedPeer._cancel") - } - return true + // There should be no requester handling this, so no further events will occur. + return false } func (ws *webseedPeer) intoSpec(r Request) webseed.RequestSpec { @@ -89,7 +85,7 @@ func (ws *webseedPeer) requester(i int) { start: for !ws.peer.closed.IsSet() { restart := false - ws.peer.actualRequestState.Requests.Iterate(func(x uint32) bool { + ws.peer.requestState.Requests.Iterate(func(x uint32) bool { r := ws.peer.t.requestIndexToRequest(x) if _, ok := ws.activeRequests[r]; ok { return true @@ -171,7 +167,9 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re log.Printf("closing %v", ws) ws.peer.close() } - ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r)) + if !ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r)) { + panic("invalid reject") + } return err } err = ws.peer.receiveChunk(&pp.Message{ @@ -187,7 +185,7 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re } func (me *webseedPeer) isLowOnRequests() bool { - return me.peer.actualRequestState.Requests.GetCardinality() < uint64(me.maxRequests) + return me.peer.requestState.Requests.GetCardinality() < uint64(me.maxRequests) } func (me *webseedPeer) peerPieces() *roaring.Bitmap {