From: Matt Joiner Date: Fri, 14 May 2021 03:06:12 +0000 (+1000) Subject: Get max unverified bytes working X-Git-Tag: v1.29.0~31^2~26 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=63b0e42731c61b34d2172354424cd7892a836320;p=btrtrc.git Get max unverified bytes working --- diff --git a/request-strategy.go b/request-strategy.go index 84473b40..8d8b3b23 100644 --- a/request-strategy.go +++ b/request-strategy.go @@ -24,11 +24,11 @@ func (cl *Client) requester() { } func (cl *Client) doRequests() { - ts := make([]*request_strategy.Torrent, 0, len(cl.torrents)) + ts := make([]request_strategy.Torrent, 0, len(cl.torrents)) for _, t := range cl.torrents { - rst := &request_strategy.Torrent{ - StableId: uintptr(unsafe.Pointer(t)), - //MaxUnverifiedBytes: 1 << 20, + rst := request_strategy.Torrent{ + StableId: uintptr(unsafe.Pointer(t)), + MaxUnverifiedBytes: 10 << 20, } if t.storage != nil { rst.Capacity = t.storage.Capacity diff --git a/request-strategy/order.go b/request-strategy/order.go index 4742254d..9052fb0c 100644 --- a/request-strategy/order.go +++ b/request-strategy/order.go @@ -16,46 +16,31 @@ type ( ChunkSpec = types.ChunkSpec ) -type ClientPieceOrder struct { - pieces []pieceRequestOrderPiece -} +type ClientPieceOrder struct{} -type orderTorrent struct { - *Torrent +type filterTorrent struct { + Torrent unverifiedBytes int64 // Potentially shared with other torrents. storageLeft *int64 - peers []*requestsPeer -} - -type pieceRequestOrderPiece struct { - t *orderTorrent - index pieceIndex - Piece } -func (me *ClientPieceOrder) Len() int { - return len(me.pieces) -} - -func (me ClientPieceOrder) sort() { - sort.Slice(me.pieces, me.less) -} - -func (me ClientPieceOrder) less(_i, _j int) bool { - i := me.pieces[_i] - j := me.pieces[_j] - return multiless.New().Int( - int(j.Priority), int(i.Priority), - ).Bool( - j.Partial, i.Partial, - ).Int64( - i.Availability, j.Availability, - ).Int( - i.index, j.index, - ).Uintptr( - i.t.StableId, j.t.StableId, - ).MustLess() +func sortFilterPieces(pieces []filterPiece) { + sort.Slice(pieces, func(_i, _j int) bool { + i := pieces[_i] + j := pieces[_j] + return multiless.New().Int( + int(j.Priority), int(i.Priority), + ).Bool( + j.Partial, i.Partial, + ).Int64( + i.Availability, j.Availability, + ).Int( + i.index, j.index, + ).Uintptr( + i.t.StableId, j.t.StableId, + ).MustLess() + }) } type requestsPeer struct { @@ -86,15 +71,27 @@ func (me *peersForPieceRequests) addNextRequest(r Request) { me.requestsInPiece++ } -func (requestOrder *ClientPieceOrder) DoRequests(torrents []*Torrent) map[PeerId]PeerNextRequestState { - requestOrder.pieces = requestOrder.pieces[:0] +type requestablePiece struct { + index pieceIndex + t Torrent + NumPendingChunks int + IterPendingChunks ChunksIter +} + +type filterPiece struct { + t *filterTorrent + index pieceIndex + Piece +} + +func getRequestablePieces(torrents []Torrent) (ret []requestablePiece) { // Storage capacity left for this run, keyed by the storage capacity pointer on the storage // TorrentImpl. storageLeft := make(map[*func() *int64]*int64) - orderTorrents := make([]*orderTorrent, 0, len(torrents)) + var pieces []filterPiece for _, _t := range torrents { // TODO: We could do metainfo requests here. - t := &orderTorrent{ + t := &filterTorrent{ Torrent: _t, unverifiedBytes: 0, } @@ -105,34 +102,16 @@ func (requestOrder *ClientPieceOrder) DoRequests(torrents []*Torrent) map[PeerId } t.storageLeft = storageLeft[key] } - var peers []*requestsPeer - for _, p := range t.Peers { - peers = append(peers, &requestsPeer{ - Peer: p, - nextState: PeerNextRequestState{ - Requests: make(map[Request]struct{}), - }, - }) - } for i, tp := range t.Pieces { - requestOrder.pieces = append(requestOrder.pieces, pieceRequestOrderPiece{ + pieces = append(pieces, filterPiece{ t: t, index: i, Piece: tp, }) - if tp.Request && tp.NumPendingChunks != 0 { - for _, p := range peers { - if p.canRequestPiece(i) { - p.requestablePiecesRemaining++ - } - } - } } - t.peers = peers - orderTorrents = append(orderTorrents, t) } - requestOrder.sort() - for _, piece := range requestOrder.pieces { + sortFilterPieces(pieces) + for _, piece := range pieces { if left := piece.t.storageLeft; left != nil { if *left < int64(piece.Length) { continue @@ -143,16 +122,48 @@ func (requestOrder *ClientPieceOrder) DoRequests(torrents []*Torrent) map[PeerId continue } if piece.t.MaxUnverifiedBytes != 0 && piece.t.unverifiedBytes+piece.Length > piece.t.MaxUnverifiedBytes { - //log.Print("skipping piece") continue } - allocatePendingChunks(piece, piece.t.peers) piece.t.unverifiedBytes += piece.Length - //log.Print(piece.t.unverifiedBytes) + ret = append(ret, requestablePiece{ + index: piece.index, + t: piece.t.Torrent, + NumPendingChunks: piece.NumPendingChunks, + IterPendingChunks: piece.iterPendingChunksWrapper, + }) + } + return +} + +// TODO: We could do metainfo requests here. +func (requestOrder *ClientPieceOrder) DoRequests(torrents []Torrent) map[PeerId]PeerNextRequestState { + requestPieces := getRequestablePieces(torrents) + allPeers := make(map[uintptr][]*requestsPeer, len(torrents)) + for _, t := range torrents { + peers := make([]*requestsPeer, 0, len(t.Peers)) + for _, p := range t.Peers { + peers = append(peers, &requestsPeer{ + Peer: p, + nextState: PeerNextRequestState{ + Requests: make(map[Request]struct{}), + }, + }) + } + allPeers[t.StableId] = peers + } + for _, piece := range requestPieces { + for _, peer := range allPeers[piece.t.StableId] { + if peer.canRequestPiece(piece.index) { + peer.requestablePiecesRemaining++ + } + } + } + for _, piece := range requestPieces { + allocatePendingChunks(piece, allPeers[piece.t.StableId]) } ret := make(map[PeerId]PeerNextRequestState) - for _, ots := range orderTorrents { - for _, rp := range ots.peers { + for _, peers := range allPeers { + for _, rp := range peers { if rp.requestablePiecesRemaining != 0 { panic(rp.requestablePiecesRemaining) } @@ -162,7 +173,7 @@ func (requestOrder *ClientPieceOrder) DoRequests(torrents []*Torrent) map[PeerId return ret } -func allocatePendingChunks(p pieceRequestOrderPiece, peers []*requestsPeer) { +func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) { peersForPiece := make([]*peersForPieceRequests, 0, len(peers)) for _, peer := range peers { peersForPiece = append(peersForPiece, &peersForPieceRequests{ @@ -204,7 +215,7 @@ func allocatePendingChunks(p pieceRequestOrderPiece, peers []*requestsPeer) { }) } preallocated := make(map[ChunkSpec]*peersForPieceRequests, p.NumPendingChunks) - p.iterPendingChunksWrapper(func(spec ChunkSpec) { + p.IterPendingChunks(func(spec ChunkSpec) { req := Request{pp.Integer(p.index), spec} for _, peer := range peersForPiece { if h := peer.HasExistingRequest; h == nil || !h(req) { @@ -221,7 +232,7 @@ func allocatePendingChunks(p pieceRequestOrderPiece, peers []*requestsPeer) { } }) pendingChunksRemaining := int(p.NumPendingChunks) - p.iterPendingChunksWrapper(func(chunk types.ChunkSpec) { + p.IterPendingChunks(func(chunk types.ChunkSpec) { if _, ok := preallocated[chunk]; ok { return } @@ -243,11 +254,12 @@ func allocatePendingChunks(p pieceRequestOrderPiece, peers []*requestsPeer) { } } peer.addNextRequest(req) - return + break } }) chunk: for chunk, prePeer := range preallocated { + pendingChunksRemaining-- req := Request{pp.Integer(p.index), chunk} prePeer.requestsInPiece-- sortPeersForPiece(&req) @@ -266,7 +278,6 @@ chunk: continue } } - pendingChunksRemaining-- peer.addNextRequest(req) continue chunk } diff --git a/request-strategy/order_test.go b/request-strategy/order_test.go index 8c518b50..fd8b53f0 100644 --- a/request-strategy/order_test.go +++ b/request-strategy/order_test.go @@ -64,7 +64,7 @@ func TestStealingFromSlowerPeer(t *testing.T) { firstStealer.Id = intPeerId(2) secondStealer := basePeer secondStealer.Id = intPeerId(3) - results := order.DoRequests([]*Torrent{{ + results := order.DoRequests([]Torrent{{ Pieces: []Piece{{ Request: true, NumPendingChunks: 5, @@ -111,7 +111,7 @@ func TestStealingFromSlowerPeersBasic(t *testing.T) { firstStealer.Id = intPeerId(2) secondStealer := basePeer secondStealer.Id = intPeerId(3) - results := order.DoRequests([]*Torrent{{ + results := order.DoRequests([]Torrent{{ Pieces: []Piece{{ Request: true, NumPendingChunks: 2, @@ -150,7 +150,7 @@ func TestPeerKeepsExistingIfReasonable(t *testing.T) { firstStealer.Id = intPeerId(2) secondStealer := basePeer secondStealer.Id = intPeerId(3) - results := order.DoRequests([]*Torrent{{ + results := order.DoRequests([]Torrent{{ Pieces: []Piece{{ Request: true, NumPendingChunks: 4, @@ -198,7 +198,7 @@ func TestDontStealUnnecessarily(t *testing.T) { firstStealer.Id = intPeerId(2) secondStealer := basePeer secondStealer.Id = intPeerId(3) - results := order.DoRequests([]*Torrent{{ + results := order.DoRequests([]Torrent{{ Pieces: []Piece{{ Request: true, NumPendingChunks: 9, diff --git a/request-strategy/piece.go b/request-strategy/piece.go index 508ed829..bc59c052 100644 --- a/request-strategy/piece.go +++ b/request-strategy/piece.go @@ -4,6 +4,8 @@ import ( "github.com/anacrolix/torrent/types" ) +type ChunksIter func(func(types.ChunkSpec)) + type Piece struct { Request bool Priority piecePriority @@ -11,10 +13,10 @@ type Piece struct { Availability int64 Length int64 NumPendingChunks int - IterPendingChunks func(func(types.ChunkSpec)) + IterPendingChunks ChunksIter } -func (p *Piece) iterPendingChunksWrapper(f func(ChunkSpec)) { +func (p Piece) iterPendingChunksWrapper(f func(ChunkSpec)) { i := p.IterPendingChunks if i != nil { i(f)