From: Matt Joiner Date: Wed, 1 Dec 2021 08:21:25 +0000 (+1100) Subject: Use interfaces to lazily expose the bare minimum inputs to GetRequestablePieces X-Git-Tag: v1.39.0^2~13 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=135f21fb644877446d811bc00f0c93e17d5b7aba;p=btrtrc.git Use interfaces to lazily expose the bare minimum inputs to GetRequestablePieces --- diff --git a/request-strategy-impls.go b/request-strategy-impls.go new file mode 100644 index 00000000..f4c12646 --- /dev/null +++ b/request-strategy-impls.go @@ -0,0 +1,74 @@ +package torrent + +import ( + "github.com/anacrolix/torrent/metainfo" + request_strategy "github.com/anacrolix/torrent/request-strategy" + "github.com/anacrolix/torrent/storage" +) + +type requestStrategyInput struct { + cl *Client + capFunc storage.TorrentCapacity +} + +func (r requestStrategyInput) Torrent(ih metainfo.Hash) request_strategy.Torrent { + return requestStrategyTorrent{r.cl.torrents[ih]} +} + +func (r requestStrategyInput) Capacity() (int64, bool) { + if r.capFunc == nil { + return 0, false + } + return (*r.capFunc)() +} + +func (r requestStrategyInput) MaxUnverifiedBytes() int64 { + return r.cl.config.MaxUnverifiedBytes +} + +var _ request_strategy.Input = requestStrategyInput{} + +// Returns what is necessary to run request_strategy.GetRequestablePieces for primaryTorrent. +func (cl *Client) getRequestStrategyInput(primaryTorrent *Torrent) (input request_strategy.Input) { + return requestStrategyInput{ + cl: cl, + capFunc: primaryTorrent.storage.Capacity, + } +} + +func (t *Torrent) getRequestStrategyInput() request_strategy.Input { + return t.cl.getRequestStrategyInput(t) +} + +type requestStrategyTorrent struct { + t *Torrent +} + +func (r requestStrategyTorrent) Piece(i int) request_strategy.Piece { + return requestStrategyPiece{r.t, i} +} + +func (r requestStrategyTorrent) ChunksPerPiece() uint32 { + return r.t.chunksPerRegularPiece() +} + +func (r requestStrategyTorrent) PieceLength() int64 { + return r.t.info.PieceLength +} + +var _ request_strategy.Torrent = requestStrategyTorrent{} + +type requestStrategyPiece struct { + t *Torrent + i pieceIndex +} + +func (r requestStrategyPiece) Request() bool { + return !r.t.ignorePieceForRequests(r.i) +} + +func (r requestStrategyPiece) NumPendingChunks() int { + return int(r.t.pieceNumPendingChunks(r.i)) +} + +var _ request_strategy.Piece = requestStrategyPiece{} diff --git a/request-strategy/order.go b/request-strategy/order.go index adbd29e4..4a86aae9 100644 --- a/request-strategy/order.go +++ b/request-strategy/order.go @@ -3,9 +3,6 @@ package request_strategy import ( "bytes" "expvar" - "runtime" - "sort" - "sync" "github.com/anacrolix/multiless" "github.com/anacrolix/torrent/metainfo" @@ -24,46 +21,6 @@ type ( ChunkSpec = types.ChunkSpec ) -type ClientPieceOrder struct{} - -func equalFilterPieces(l, r []filterPiece) bool { - if len(l) != len(r) { - return false - } - for i := range l { - lp := &l[i] - rp := &r[i] - if lp.Priority != rp.Priority || - lp.Partial != rp.Partial || - lp.Availability != rp.Availability || - lp.index != rp.index || - lp.t.InfoHash != rp.t.InfoHash { - return false - } - } - return true -} - -type pieceSorter struct { - swap func(i, j int) - get func(i int) *filterPiece - len int -} - -func (me pieceSorter) Len() int { - return me.len -} - -func (me pieceSorter) Swap(i, j int) { - me.swap(i, j) -} - -func (me pieceSorter) Less(_i, _j int) bool { - i := me.get(_i) - j := me.get(_j) - return pieceOrderLess(i.toPieceOrderInput(), j.toPieceOrderInput()).MustLess() -} - type pieceOrderInput struct { PieceRequestOrderState PieceRequestOrderKey @@ -112,339 +69,49 @@ func (me *peersForPieceRequests) addNextRequest(r RequestIndex) { me.requestsInPiece++ } -type requestablePiece struct { - index pieceIndex - t *Torrent - alwaysReallocate bool - NumPendingChunks int - IterPendingChunks ChunksIterFunc -} - -func (p *requestablePiece) chunkIndexToRequestIndex(c ChunkIndex) RequestIndex { - return p.t.ChunksPerPiece*uint32(p.index) + c -} - -type filterPiece struct { - t *Torrent - index pieceIndex - *Piece -} - -func (fp *filterPiece) toPieceOrderInput() (ret pieceOrderInput) { - ret.Partial = fp.Partial - ret.InfoHash = fp.t.InfoHash - ret.Availability = fp.Availability - ret.Priority = fp.Priority - ret.Index = fp.index - return -} - -var ( - sortsMu sync.Mutex - sorts = map[*[]filterPiece][]int{} -) - -func reorderedFilterPieces(pieces []filterPiece, indices []int) (ret []filterPiece) { - ret = make([]filterPiece, len(indices)) - for i, j := range indices { - ret[i] = pieces[j] - } - return -} - var packageExpvarMap = expvar.NewMap("request-strategy") -func getSortedFilterPieces(unsorted []filterPiece) []filterPiece { - const cachePieceSorts = false - if !cachePieceSorts { - sort.Sort(pieceSorter{ - len: len(unsorted), - swap: func(i, j int) { - unsorted[i], unsorted[j] = unsorted[j], unsorted[i] - }, - get: func(i int) *filterPiece { - return &unsorted[i] - }, - }) - return unsorted - } - sortsMu.Lock() - defer sortsMu.Unlock() - for key, order := range sorts { - if equalFilterPieces(*key, unsorted) { - packageExpvarMap.Add("reused filter piece ordering", 1) - return reorderedFilterPieces(unsorted, order) - } - } - indices := make([]int, len(unsorted)) - for i := 0; i < len(indices); i++ { - indices[i] = i - } - sort.Sort(pieceSorter{ - len: len(unsorted), - swap: func(i, j int) { - indices[i], indices[j] = indices[j], indices[i] - }, - get: func(i int) *filterPiece { - return &unsorted[indices[i]] - }, - }) - packageExpvarMap.Add("added filter piece ordering", 1) - sorts[&unsorted] = indices - runtime.SetFinalizer(&pieceOrderingFinalizer{unsorted: &unsorted}, func(me *pieceOrderingFinalizer) { - packageExpvarMap.Add("finalized filter piece ordering", 1) - sortsMu.Lock() - defer sortsMu.Unlock() - delete(sorts, me.unsorted) - }) - return reorderedFilterPieces(unsorted, indices) -} - -type pieceOrderingFinalizer struct { - unsorted *[]filterPiece -} - // Calls f with requestable pieces in order. -func GetRequestablePieces(input Input, pro *PieceRequestOrder, f func(t *Torrent, p *Piece, pieceIndex int)) { +func GetRequestablePieces(input Input, pro *PieceRequestOrder, f func(ih metainfo.Hash, pieceIndex int)) { // Storage capacity left for this run, keyed by the storage capacity pointer on the storage // TorrentImpl. A nil value means no capacity limit. var storageLeft *int64 - if input.Capacity != nil { - storageLeft = new(int64) - *storageLeft = *input.Capacity + if cap, ok := input.Capacity(); ok { + storageLeft = &cap } var allTorrentsUnverifiedBytes int64 - torrentUnverifiedBytes := map[metainfo.Hash]int64{} pro.tree.Ascend(func(i btree.Item) bool { _i := i.(pieceRequestOrderItem) - var t Torrent = input.Torrents[_i.key.InfoHash] - var piece *Piece = &t.Pieces[_i.key.Index] - if left := storageLeft; left != nil { - if *left < piece.Length { + ih := _i.key.InfoHash + var t Torrent = input.Torrent(ih) + var piece Piece = t.Piece(_i.key.Index) + pieceLength := t.PieceLength() + if storageLeft != nil { + if *storageLeft < pieceLength { return true } - *left -= piece.Length + *storageLeft -= pieceLength } - if !piece.Request || piece.NumPendingChunks == 0 { + if !piece.Request() || piece.NumPendingChunks() == 0 { // TODO: Clarify exactly what is verified. Stuff that's being hashed should be // considered unverified and hold up further requests. - - return true - } - if t.MaxUnverifiedBytes != 0 && torrentUnverifiedBytes[t.InfoHash]+piece.Length > t.MaxUnverifiedBytes { return true } - if input.MaxUnverifiedBytes != 0 && allTorrentsUnverifiedBytes+piece.Length > input.MaxUnverifiedBytes { + if input.MaxUnverifiedBytes() != 0 && allTorrentsUnverifiedBytes+pieceLength > input.MaxUnverifiedBytes() { return true } - torrentUnverifiedBytes[t.InfoHash] += piece.Length - allTorrentsUnverifiedBytes += piece.Length - f(&t, piece, _i.key.Index) + allTorrentsUnverifiedBytes += pieceLength + f(ih, _i.key.Index) return true }) return } -type Input struct { - // This is all torrents that share the same capacity below (or likely a single torrent if there - // is infinite capacity, since you could just run it separately for each Torrent if that's the - // case). - Torrents map[metainfo.Hash]Torrent - // Must not be modified. Non-nil if capacity is not infinite, meaning that pieces of torrents - // that share the same capacity key must be incorporated in piece ordering. - Capacity *int64 +type Input interface { + Torrent(metainfo.Hash) Torrent + // Storage capacity, shared among all Torrents with the same storage.TorrentCapacity pointer in + // their storage.Torrent references. + Capacity() (cap int64, capped bool) // Across all the Torrents. This might be partitioned by storage capacity key now. - MaxUnverifiedBytes int64 -} - -// Checks that a sorted peersForPiece slice makes sense. -func ensureValidSortedPeersForPieceRequests(peers *peersForPieceSorter) { - if !sort.IsSorted(peers) { - panic("not sorted") - } - peerMap := make(map[*peersForPieceRequests]struct{}, peers.Len()) - for _, p := range peers.peersForPiece { - if _, ok := peerMap[p]; ok { - panic(p) - } - peerMap[p] = struct{}{} - } -} - -var peersForPiecesPool sync.Pool - -func makePeersForPiece(cap int) []*peersForPieceRequests { - got := peersForPiecesPool.Get() - if got == nil { - return make([]*peersForPieceRequests, 0, cap) - } - return got.([]*peersForPieceRequests)[:0] -} - -type peersForPieceSorter struct { - peersForPiece []*peersForPieceRequests - req *RequestIndex - p requestablePiece -} - -func (me *peersForPieceSorter) Len() int { - return len(me.peersForPiece) -} - -func (me *peersForPieceSorter) Swap(i, j int) { - me.peersForPiece[i], me.peersForPiece[j] = me.peersForPiece[j], me.peersForPiece[i] -} - -func (me *peersForPieceSorter) Less(_i, _j int) bool { - i := me.peersForPiece[_i] - j := me.peersForPiece[_j] - req := me.req - p := &me.p - byHasRequest := func() multiless.Computation { - ml := multiless.New() - if req != nil { - iHas := i.nextState.Requests.Contains(*req) - jHas := j.nextState.Requests.Contains(*req) - ml = ml.Bool(jHas, iHas) - } - return ml - }() - ml := multiless.New() - // We always "reallocate", that is force even striping amongst peers that are either on - // the last piece they can contribute too, or for pieces marked for this behaviour. - // Striping prevents starving peers of requests, and will always re-balance to the - // fastest known peers. - if !p.alwaysReallocate { - ml = ml.Bool( - j.requestablePiecesRemaining == 1, - i.requestablePiecesRemaining == 1) - } - if p.alwaysReallocate || j.requestablePiecesRemaining == 1 { - ml = ml.Int( - i.requestsInPiece, - j.requestsInPiece) - } else { - ml = ml.AndThen(byHasRequest) - } - ml = ml.Int( - i.requestablePiecesRemaining, - j.requestablePiecesRemaining, - ).Float64( - j.DownloadRate, - i.DownloadRate, - ) - if ml.Ok() { - return ml.Less() - } - ml = ml.AndThen(byHasRequest) - return ml.Int64( - int64(j.Age), int64(i.Age), - // TODO: Probably peer priority can come next - ).Uintptr( - i.Id.Uintptr(), - j.Id.Uintptr(), - ).MustLess() -} - -func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) { - peersForPiece := makePeersForPiece(len(peers)) - for _, peer := range peers { - if !peer.canRequestPiece(p.index) { - continue - } - if !peer.canFitRequest() { - peer.requestablePiecesRemaining-- - continue - } - peersForPiece = append(peersForPiece, &peersForPieceRequests{ - requestsInPiece: 0, - requestsPeer: peer, - }) - } - defer func() { - for _, peer := range peersForPiece { - peer.requestablePiecesRemaining-- - } - peersForPiecesPool.Put(peersForPiece) - }() - peersForPieceSorter := peersForPieceSorter{ - peersForPiece: peersForPiece, - p: p, - } - sortPeersForPiece := func(req *RequestIndex) { - peersForPieceSorter.req = req - sort.Sort(&peersForPieceSorter) - // ensureValidSortedPeersForPieceRequests(&peersForPieceSorter) - } - // Chunks can be preassigned several times, if peers haven't been able to update their "actual" - // with "next" request state before another request strategy run occurs. - preallocated := make([][]*peersForPieceRequests, p.t.ChunksPerPiece) - p.IterPendingChunks(func(spec ChunkIndex) { - req := p.chunkIndexToRequestIndex(spec) - for _, peer := range peersForPiece { - if !peer.ExistingRequests.Contains(req) { - continue - } - if !peer.canFitRequest() { - continue - } - preallocated[spec] = append(preallocated[spec], peer) - peer.addNextRequest(req) - } - }) - pendingChunksRemaining := int(p.NumPendingChunks) - p.IterPendingChunks(func(chunk ChunkIndex) { - if len(preallocated[chunk]) != 0 { - return - } - req := p.chunkIndexToRequestIndex(chunk) - defer func() { pendingChunksRemaining-- }() - sortPeersForPiece(nil) - for _, peer := range peersForPiece { - if !peer.canFitRequest() { - continue - } - if !peer.PieceAllowedFast.ContainsInt(p.index) { - // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces. - peer.nextState.Interested = true - if peer.Choking { - continue - } - } - peer.addNextRequest(req) - break - } - }) -chunk: - for chunk, prePeers := range preallocated { - if len(prePeers) == 0 { - continue - } - pendingChunksRemaining-- - req := p.chunkIndexToRequestIndex(ChunkIndex(chunk)) - for _, pp := range prePeers { - pp.requestsInPiece-- - } - sortPeersForPiece(&req) - for _, pp := range prePeers { - pp.nextState.Requests.Remove(req) - } - for _, peer := range peersForPiece { - if !peer.canFitRequest() { - continue - } - if !peer.PieceAllowedFast.ContainsInt(p.index) { - // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces. - peer.nextState.Interested = true - if peer.Choking { - continue - } - } - peer.addNextRequest(req) - continue chunk - } - } - if pendingChunksRemaining != 0 { - panic(pendingChunksRemaining) - } + MaxUnverifiedBytes() int64 } diff --git a/request-strategy/piece.go b/request-strategy/piece.go index 8a038e67..626cc75b 100644 --- a/request-strategy/piece.go +++ b/request-strategy/piece.go @@ -6,19 +6,7 @@ type ChunksIter interface { Iter(func(ci ChunkIndex)) } -type Piece struct { - Request bool - Priority piecePriority - Partial bool - Availability int64 - Length int64 - NumPendingChunks int - IterPendingChunks ChunksIter -} - -func (p Piece) iterPendingChunksWrapper(f func(ChunkIndex)) { - i := p.IterPendingChunks - if i != nil { - i.Iter(f) - } +type Piece interface { + Request() bool + NumPendingChunks() int } diff --git a/request-strategy/torrent.go b/request-strategy/torrent.go index dbb41df3..51fc1a6c 100644 --- a/request-strategy/torrent.go +++ b/request-strategy/torrent.go @@ -1,14 +1,7 @@ package request_strategy -import ( - "github.com/anacrolix/torrent/metainfo" -) - -type Torrent struct { - Pieces []Piece - // Some value that's unique and stable between runs. - InfoHash metainfo.Hash - ChunksPerPiece uint32 - // TODO: This isn't actually configurable anywhere yet. - MaxUnverifiedBytes int64 +type Torrent interface { + Piece(int) Piece + ChunksPerPiece() uint32 + PieceLength() int64 } diff --git a/requesting.go b/requesting.go index 1e95cbcf..5e02b79b 100644 --- a/requesting.go +++ b/requesting.go @@ -12,54 +12,10 @@ import ( "github.com/anacrolix/log" "github.com/anacrolix/multiless" - "github.com/anacrolix/torrent/metainfo" request_strategy "github.com/anacrolix/torrent/request-strategy" ) -// Returns what is necessary to run request_strategy.GetRequestablePieces for primaryTorrent. -func (cl *Client) getRequestStrategyInput(primaryTorrent *Torrent) (input request_strategy.Input) { - input.MaxUnverifiedBytes = cl.config.MaxUnverifiedBytes - if !primaryTorrent.haveInfo() { - return - } - if capFunc := primaryTorrent.storage.Capacity; capFunc != nil { - if cap, ok := (*capFunc)(); ok { - input.Capacity = &cap - } - } - input.Torrents = make(map[metainfo.Hash]request_strategy.Torrent, len(cl.torrents)) - for _, t := range cl.torrents { - if !t.haveInfo() { - // This would be removed if metadata is handled here. Determining chunks per piece - // requires the info. If we have no info, we have no pieces too, so the end result is - // the same. - continue - } - if t.storage.Capacity != primaryTorrent.storage.Capacity { - continue - } - input.Torrents[t.infoHash] = t.requestStrategyTorrentInput() - } - return -} - -func (t *Torrent) getRequestStrategyInput() request_strategy.Input { - return t.cl.getRequestStrategyInput(t) -} - -func (t *Torrent) requestStrategyTorrentInput() request_strategy.Torrent { - rst := request_strategy.Torrent{ - InfoHash: t.infoHash, - ChunksPerPiece: t.chunksPerRegularPiece(), - } - rst.Pieces = make([]request_strategy.Piece, 0, len(t.pieces)) - for i := range t.pieces { - rst.Pieces = append(rst.Pieces, t.makeRequestStrategyPiece(i)) - } - return rst -} - func (t *Torrent) requestStrategyPieceOrderState(i int) request_strategy.PieceRequestOrderState { return request_strategy.PieceRequestOrderState{ Priority: t.piece(i).purePriority(), @@ -68,19 +24,6 @@ func (t *Torrent) requestStrategyPieceOrderState(i int) request_strategy.PieceRe } } -func (t *Torrent) makeRequestStrategyPiece(i int) request_strategy.Piece { - p := &t.pieces[i] - return request_strategy.Piece{ - Request: !t.ignorePieceForRequests(i), - Priority: p.purePriority(), - Partial: t.piecePartiallyDownloaded(i), - Availability: p.availability, - Length: int64(p.length()), - NumPendingChunks: int(t.pieceNumPendingChunks(i)), - IterPendingChunks: &p.undirtiedChunksIter, - } -} - func init() { gob.Register(peerId{}) } @@ -125,9 +68,8 @@ type ( ) type peerRequests struct { - requestIndexes []RequestIndex - peer *Peer - torrentStrategyInput request_strategy.Torrent + requestIndexes []RequestIndex + peer *Peer } func (p *peerRequests) Len() int { @@ -138,8 +80,8 @@ func (p *peerRequests) Less(i, j int) bool { leftRequest := p.requestIndexes[i] rightRequest := p.requestIndexes[j] t := p.peer.t - leftPieceIndex := leftRequest / p.torrentStrategyInput.ChunksPerPiece - rightPieceIndex := rightRequest / p.torrentStrategyInput.ChunksPerPiece + leftPieceIndex := leftRequest / t.chunksPerRegularPiece() + rightPieceIndex := rightRequest / t.chunksPerRegularPiece() leftCurrent := p.peer.actualRequestState.Requests.Contains(leftRequest) rightCurrent := p.peer.actualRequestState.Requests.Contains(rightRequest) pending := func(index RequestIndex, current bool) int { @@ -168,13 +110,15 @@ func (p *peerRequests) Less(i, j int) bool { pending(leftRequest, leftCurrent), pending(rightRequest, rightCurrent)) ml = ml.Bool(!leftCurrent, !rightCurrent) + leftPiece := t.piece(int(leftPieceIndex)) + rightPiece := t.piece(int(rightPieceIndex)) ml = ml.Int( - -int(p.torrentStrategyInput.Pieces[leftPieceIndex].Priority), - -int(p.torrentStrategyInput.Pieces[rightPieceIndex].Priority), + -int(leftPiece.purePriority()), + -int(rightPiece.purePriority()), ) ml = ml.Int( - int(p.torrentStrategyInput.Pieces[leftPieceIndex].Availability), - int(p.torrentStrategyInput.Pieces[rightPieceIndex].Availability)) + int(leftPiece.availability), + int(rightPiece.availability)) ml = ml.Uint32(leftPieceIndex, rightPieceIndex) ml = ml.Uint32(leftRequest, rightRequest) return ml.MustLess() @@ -205,19 +149,18 @@ func (p *Peer) getDesiredRequestState() (desired desiredRequestState) { requestHeap := peerRequests{ peer: p, } - requestHeap.torrentStrategyInput = input.Torrents[p.t.infoHash] request_strategy.GetRequestablePieces( input, p.t.cl.pieceRequestOrder[p.t.storage.Capacity], - func(t *request_strategy.Torrent, rsp *request_strategy.Piece, pieceIndex int) { - if t.InfoHash != p.t.infoHash { + func(ih InfoHash, pieceIndex int) { + if ih != p.t.infoHash { return } if !p.peerHasPiece(pieceIndex) { return } allowedFast := p.peerAllowedFast.ContainsInt(pieceIndex) - rsp.IterPendingChunks.Iter(func(ci request_strategy.ChunkIndex) { + p.t.piece(pieceIndex).undirtiedChunksIter.Iter(func(ci request_strategy.ChunkIndex) { r := p.t.pieceRequestIndexOffset(pieceIndex) + ci // if p.t.pendingRequests.Get(r) != 0 && !p.actualRequestState.Requests.Contains(r) { // return