From 307d6d178fc6502a800380a642460b6cf4f75e94 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Fri, 14 May 2021 11:50:41 +1000 Subject: [PATCH] Prepare to implement max unverified bytes --- request-strategy.go | 5 ++- request-strategy/order.go | 61 ++++++++++++++++++++++++------------- request-strategy/torrent.go | 11 +++++++ 3 files changed, 55 insertions(+), 22 deletions(-) create mode 100644 request-strategy/torrent.go diff --git a/request-strategy.go b/request-strategy.go index 21596a23..84473b40 100644 --- a/request-strategy.go +++ b/request-strategy.go @@ -26,7 +26,10 @@ func (cl *Client) requester() { func (cl *Client) doRequests() { ts := make([]*request_strategy.Torrent, 0, len(cl.torrents)) for _, t := range cl.torrents { - rst := &request_strategy.Torrent{} + rst := &request_strategy.Torrent{ + StableId: uintptr(unsafe.Pointer(t)), + //MaxUnverifiedBytes: 1 << 20, + } if t.storage != nil { rst.Capacity = t.storage.Capacity } diff --git a/request-strategy/order.go b/request-strategy/order.go index 56002594..4742254d 100644 --- a/request-strategy/order.go +++ b/request-strategy/order.go @@ -20,8 +20,16 @@ type ClientPieceOrder struct { pieces []pieceRequestOrderPiece } +type orderTorrent struct { + *Torrent + unverifiedBytes int64 + // Potentially shared with other torrents. + storageLeft *int64 + peers []*requestsPeer +} + type pieceRequestOrderPiece struct { - t *Torrent + t *orderTorrent index pieceIndex Piece } @@ -41,7 +49,13 @@ func (me ClientPieceOrder) less(_i, _j int) bool { int(j.Priority), int(i.Priority), ).Bool( j.Partial, i.Partial, - ).Int64(i.Availability, j.Availability).Int(i.index, j.index).Less() + ).Int64( + i.Availability, j.Availability, + ).Int( + i.index, j.index, + ).Uintptr( + i.t.StableId, j.t.StableId, + ).MustLess() } type requestsPeer struct { @@ -72,25 +86,24 @@ func (me *peersForPieceRequests) addNextRequest(r Request) { me.requestsInPiece++ } -type Torrent struct { - Pieces []Piece - Capacity *func() *int64 - Peers []Peer // not closed. -} - func (requestOrder *ClientPieceOrder) DoRequests(torrents []*Torrent) map[PeerId]PeerNextRequestState { requestOrder.pieces = requestOrder.pieces[:0] - allPeers := make(map[*Torrent][]*requestsPeer) // Storage capacity left for this run, keyed by the storage capacity pointer on the storage // TorrentImpl. storageLeft := make(map[*func() *int64]*int64) - for _, t := range torrents { + orderTorrents := make([]*orderTorrent, 0, len(torrents)) + for _, _t := range torrents { // TODO: We could do metainfo requests here. + t := &orderTorrent{ + Torrent: _t, + unverifiedBytes: 0, + } key := t.Capacity if key != nil { if _, ok := storageLeft[key]; !ok { storageLeft[key] = (*key)() } + t.storageLeft = storageLeft[key] } var peers []*requestsPeer for _, p := range t.Peers { @@ -107,7 +120,7 @@ func (requestOrder *ClientPieceOrder) DoRequests(torrents []*Torrent) map[PeerId index: i, Piece: tp, }) - if tp.Request { + if tp.Request && tp.NumPendingChunks != 0 { for _, p := range peers { if p.canRequestPiece(i) { p.requestablePiecesRemaining++ @@ -115,25 +128,31 @@ func (requestOrder *ClientPieceOrder) DoRequests(torrents []*Torrent) map[PeerId } } } - allPeers[t] = peers + t.peers = peers + orderTorrents = append(orderTorrents, t) } requestOrder.sort() - for _, p := range requestOrder.pieces { - torrentPiece := p - if left := storageLeft[p.t.Capacity]; left != nil { - if *left < int64(torrentPiece.Length) { + for _, piece := range requestOrder.pieces { + if left := piece.t.storageLeft; left != nil { + if *left < int64(piece.Length) { continue } - *left -= int64(torrentPiece.Length) + *left -= int64(piece.Length) + } + if !piece.Request || piece.NumPendingChunks == 0 { + continue } - if !p.Request { + if piece.t.MaxUnverifiedBytes != 0 && piece.t.unverifiedBytes+piece.Length > piece.t.MaxUnverifiedBytes { + //log.Print("skipping piece") continue } - allocatePendingChunks(p, allPeers[p.t]) + allocatePendingChunks(piece, piece.t.peers) + piece.t.unverifiedBytes += piece.Length + //log.Print(piece.t.unverifiedBytes) } ret := make(map[PeerId]PeerNextRequestState) - for _, peers := range allPeers { - for _, rp := range peers { + for _, ots := range orderTorrents { + for _, rp := range ots.peers { if rp.requestablePiecesRemaining != 0 { panic(rp.requestablePiecesRemaining) } diff --git a/request-strategy/torrent.go b/request-strategy/torrent.go new file mode 100644 index 00000000..2090c7a8 --- /dev/null +++ b/request-strategy/torrent.go @@ -0,0 +1,11 @@ +package request_strategy + +type Torrent struct { + Pieces []Piece + Capacity *func() *int64 + Peers []Peer // not closed. + // Some value that's unique and stable between runs. Could even use the infohash? + StableId uintptr + + MaxUnverifiedBytes int64 +} -- 2.44.0