]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Get max unverified bytes working
authorMatt Joiner <anacrolix@gmail.com>
Fri, 14 May 2021 03:06:12 +0000 (13:06 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 7 Jun 2021 03:01:39 +0000 (13:01 +1000)
request-strategy.go
request-strategy/order.go
request-strategy/order_test.go
request-strategy/piece.go

index 84473b404e27e05548595ff0242a8109bb670b32..8d8b3b233dc6e25884ede25278da6430448b5b95 100644 (file)
@@ -24,11 +24,11 @@ func (cl *Client) requester() {
 }
 
 func (cl *Client) doRequests() {
-       ts := make([]*request_strategy.Torrent, 0, len(cl.torrents))
+       ts := make([]request_strategy.Torrent, 0, len(cl.torrents))
        for _, t := range cl.torrents {
-               rst := &request_strategy.Torrent{
-                       StableId: uintptr(unsafe.Pointer(t)),
-                       //MaxUnverifiedBytes: 1 << 20,
+               rst := request_strategy.Torrent{
+                       StableId:           uintptr(unsafe.Pointer(t)),
+                       MaxUnverifiedBytes: 10 << 20,
                }
                if t.storage != nil {
                        rst.Capacity = t.storage.Capacity
index 4742254d98365b24c6a4b4282e21bafc68102f50..9052fb0cc5514bf9dd62bdc20e90649184878415 100644 (file)
@@ -16,46 +16,31 @@ type (
        ChunkSpec = types.ChunkSpec
 )
 
-type ClientPieceOrder struct {
-       pieces []pieceRequestOrderPiece
-}
+type ClientPieceOrder struct{}
 
-type orderTorrent struct {
-       *Torrent
+type filterTorrent struct {
+       Torrent
        unverifiedBytes int64
        // Potentially shared with other torrents.
        storageLeft *int64
-       peers       []*requestsPeer
-}
-
-type pieceRequestOrderPiece struct {
-       t     *orderTorrent
-       index pieceIndex
-       Piece
 }
 
-func (me *ClientPieceOrder) Len() int {
-       return len(me.pieces)
-}
-
-func (me ClientPieceOrder) sort() {
-       sort.Slice(me.pieces, me.less)
-}
-
-func (me ClientPieceOrder) less(_i, _j int) bool {
-       i := me.pieces[_i]
-       j := me.pieces[_j]
-       return multiless.New().Int(
-               int(j.Priority), int(i.Priority),
-       ).Bool(
-               j.Partial, i.Partial,
-       ).Int64(
-               i.Availability, j.Availability,
-       ).Int(
-               i.index, j.index,
-       ).Uintptr(
-               i.t.StableId, j.t.StableId,
-       ).MustLess()
+func sortFilterPieces(pieces []filterPiece) {
+       sort.Slice(pieces, func(_i, _j int) bool {
+               i := pieces[_i]
+               j := pieces[_j]
+               return multiless.New().Int(
+                       int(j.Priority), int(i.Priority),
+               ).Bool(
+                       j.Partial, i.Partial,
+               ).Int64(
+                       i.Availability, j.Availability,
+               ).Int(
+                       i.index, j.index,
+               ).Uintptr(
+                       i.t.StableId, j.t.StableId,
+               ).MustLess()
+       })
 }
 
 type requestsPeer struct {
@@ -86,15 +71,27 @@ func (me *peersForPieceRequests) addNextRequest(r Request) {
        me.requestsInPiece++
 }
 
-func (requestOrder *ClientPieceOrder) DoRequests(torrents []*Torrent) map[PeerId]PeerNextRequestState {
-       requestOrder.pieces = requestOrder.pieces[:0]
+type requestablePiece struct {
+       index             pieceIndex
+       t                 Torrent
+       NumPendingChunks  int
+       IterPendingChunks ChunksIter
+}
+
+type filterPiece struct {
+       t     *filterTorrent
+       index pieceIndex
+       Piece
+}
+
+func getRequestablePieces(torrents []Torrent) (ret []requestablePiece) {
        // Storage capacity left for this run, keyed by the storage capacity pointer on the storage
        // TorrentImpl.
        storageLeft := make(map[*func() *int64]*int64)
-       orderTorrents := make([]*orderTorrent, 0, len(torrents))
+       var pieces []filterPiece
        for _, _t := range torrents {
                // TODO: We could do metainfo requests here.
-               t := &orderTorrent{
+               t := &filterTorrent{
                        Torrent:         _t,
                        unverifiedBytes: 0,
                }
@@ -105,34 +102,16 @@ func (requestOrder *ClientPieceOrder) DoRequests(torrents []*Torrent) map[PeerId
                        }
                        t.storageLeft = storageLeft[key]
                }
-               var peers []*requestsPeer
-               for _, p := range t.Peers {
-                       peers = append(peers, &requestsPeer{
-                               Peer: p,
-                               nextState: PeerNextRequestState{
-                                       Requests: make(map[Request]struct{}),
-                               },
-                       })
-               }
                for i, tp := range t.Pieces {
-                       requestOrder.pieces = append(requestOrder.pieces, pieceRequestOrderPiece{
+                       pieces = append(pieces, filterPiece{
                                t:     t,
                                index: i,
                                Piece: tp,
                        })
-                       if tp.Request && tp.NumPendingChunks != 0 {
-                               for _, p := range peers {
-                                       if p.canRequestPiece(i) {
-                                               p.requestablePiecesRemaining++
-                                       }
-                               }
-                       }
                }
-               t.peers = peers
-               orderTorrents = append(orderTorrents, t)
        }
-       requestOrder.sort()
-       for _, piece := range requestOrder.pieces {
+       sortFilterPieces(pieces)
+       for _, piece := range pieces {
                if left := piece.t.storageLeft; left != nil {
                        if *left < int64(piece.Length) {
                                continue
@@ -143,16 +122,48 @@ func (requestOrder *ClientPieceOrder) DoRequests(torrents []*Torrent) map[PeerId
                        continue
                }
                if piece.t.MaxUnverifiedBytes != 0 && piece.t.unverifiedBytes+piece.Length > piece.t.MaxUnverifiedBytes {
-                       //log.Print("skipping piece")
                        continue
                }
-               allocatePendingChunks(piece, piece.t.peers)
                piece.t.unverifiedBytes += piece.Length
-               //log.Print(piece.t.unverifiedBytes)
+               ret = append(ret, requestablePiece{
+                       index:             piece.index,
+                       t:                 piece.t.Torrent,
+                       NumPendingChunks:  piece.NumPendingChunks,
+                       IterPendingChunks: piece.iterPendingChunksWrapper,
+               })
+       }
+       return
+}
+
+// TODO: We could do metainfo requests here.
+func (requestOrder *ClientPieceOrder) DoRequests(torrents []Torrent) map[PeerId]PeerNextRequestState {
+       requestPieces := getRequestablePieces(torrents)
+       allPeers := make(map[uintptr][]*requestsPeer, len(torrents))
+       for _, t := range torrents {
+               peers := make([]*requestsPeer, 0, len(t.Peers))
+               for _, p := range t.Peers {
+                       peers = append(peers, &requestsPeer{
+                               Peer: p,
+                               nextState: PeerNextRequestState{
+                                       Requests: make(map[Request]struct{}),
+                               },
+                       })
+               }
+               allPeers[t.StableId] = peers
+       }
+       for _, piece := range requestPieces {
+               for _, peer := range allPeers[piece.t.StableId] {
+                       if peer.canRequestPiece(piece.index) {
+                               peer.requestablePiecesRemaining++
+                       }
+               }
+       }
+       for _, piece := range requestPieces {
+               allocatePendingChunks(piece, allPeers[piece.t.StableId])
        }
        ret := make(map[PeerId]PeerNextRequestState)
-       for _, ots := range orderTorrents {
-               for _, rp := range ots.peers {
+       for _, peers := range allPeers {
+               for _, rp := range peers {
                        if rp.requestablePiecesRemaining != 0 {
                                panic(rp.requestablePiecesRemaining)
                        }
@@ -162,7 +173,7 @@ func (requestOrder *ClientPieceOrder) DoRequests(torrents []*Torrent) map[PeerId
        return ret
 }
 
-func allocatePendingChunks(p pieceRequestOrderPiece, peers []*requestsPeer) {
+func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) {
        peersForPiece := make([]*peersForPieceRequests, 0, len(peers))
        for _, peer := range peers {
                peersForPiece = append(peersForPiece, &peersForPieceRequests{
@@ -204,7 +215,7 @@ func allocatePendingChunks(p pieceRequestOrderPiece, peers []*requestsPeer) {
                })
        }
        preallocated := make(map[ChunkSpec]*peersForPieceRequests, p.NumPendingChunks)
-       p.iterPendingChunksWrapper(func(spec ChunkSpec) {
+       p.IterPendingChunks(func(spec ChunkSpec) {
                req := Request{pp.Integer(p.index), spec}
                for _, peer := range peersForPiece {
                        if h := peer.HasExistingRequest; h == nil || !h(req) {
@@ -221,7 +232,7 @@ func allocatePendingChunks(p pieceRequestOrderPiece, peers []*requestsPeer) {
                }
        })
        pendingChunksRemaining := int(p.NumPendingChunks)
-       p.iterPendingChunksWrapper(func(chunk types.ChunkSpec) {
+       p.IterPendingChunks(func(chunk types.ChunkSpec) {
                if _, ok := preallocated[chunk]; ok {
                        return
                }
@@ -243,11 +254,12 @@ func allocatePendingChunks(p pieceRequestOrderPiece, peers []*requestsPeer) {
                                }
                        }
                        peer.addNextRequest(req)
-                       return
+                       break
                }
        })
 chunk:
        for chunk, prePeer := range preallocated {
+               pendingChunksRemaining--
                req := Request{pp.Integer(p.index), chunk}
                prePeer.requestsInPiece--
                sortPeersForPiece(&req)
@@ -266,7 +278,6 @@ chunk:
                                        continue
                                }
                        }
-                       pendingChunksRemaining--
                        peer.addNextRequest(req)
                        continue chunk
                }
index 8c518b5032cfe16eea78558fc9e96446090f2b4d..fd8b53f05a3d007232ac3260518e68d442e3b0ec 100644 (file)
@@ -64,7 +64,7 @@ func TestStealingFromSlowerPeer(t *testing.T) {
        firstStealer.Id = intPeerId(2)
        secondStealer := basePeer
        secondStealer.Id = intPeerId(3)
-       results := order.DoRequests([]*Torrent{{
+       results := order.DoRequests([]Torrent{{
                Pieces: []Piece{{
                        Request:           true,
                        NumPendingChunks:  5,
@@ -111,7 +111,7 @@ func TestStealingFromSlowerPeersBasic(t *testing.T) {
        firstStealer.Id = intPeerId(2)
        secondStealer := basePeer
        secondStealer.Id = intPeerId(3)
-       results := order.DoRequests([]*Torrent{{
+       results := order.DoRequests([]Torrent{{
                Pieces: []Piece{{
                        Request:           true,
                        NumPendingChunks:  2,
@@ -150,7 +150,7 @@ func TestPeerKeepsExistingIfReasonable(t *testing.T) {
        firstStealer.Id = intPeerId(2)
        secondStealer := basePeer
        secondStealer.Id = intPeerId(3)
-       results := order.DoRequests([]*Torrent{{
+       results := order.DoRequests([]Torrent{{
                Pieces: []Piece{{
                        Request:           true,
                        NumPendingChunks:  4,
@@ -198,7 +198,7 @@ func TestDontStealUnnecessarily(t *testing.T) {
        firstStealer.Id = intPeerId(2)
        secondStealer := basePeer
        secondStealer.Id = intPeerId(3)
-       results := order.DoRequests([]*Torrent{{
+       results := order.DoRequests([]Torrent{{
                Pieces: []Piece{{
                        Request:           true,
                        NumPendingChunks:  9,
index 508ed8294fed2743b2d0f22727ce11c15ca5fa30..bc59c052344aaa5689fb7e1af806cec75906bfae 100644 (file)
@@ -4,6 +4,8 @@ import (
        "github.com/anacrolix/torrent/types"
 )
 
+type ChunksIter func(func(types.ChunkSpec))
+
 type Piece struct {
        Request           bool
        Priority          piecePriority
@@ -11,10 +13,10 @@ type Piece struct {
        Availability      int64
        Length            int64
        NumPendingChunks  int
-       IterPendingChunks func(func(types.ChunkSpec))
+       IterPendingChunks ChunksIter
 }
 
-func (p *Piece) iterPendingChunksWrapper(f func(ChunkSpec)) {
+func (p Piece) iterPendingChunksWrapper(f func(ChunkSpec)) {
        i := p.IterPendingChunks
        if i != nil {
                i(f)