]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Store peer requests in a bitmap
authorMatt Joiner <anacrolix@gmail.com>
Sun, 19 Sep 2021 05:16:37 +0000 (15:16 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Sun, 19 Sep 2021 05:16:37 +0000 (15:16 +1000)
peerconn.go
peerconn_test.go
piece.go
request-strategy/order.go
request-strategy/order_test.go
request-strategy/peer.go
request-strategy/piece.go
request-strategy/torrent.go
requesting.go
torrent.go
webseed-peer.go

index 452166ac69042337e9f3d1ea854ecac2d6b923bf..1b9bdbc1d2bd4b9a5dfb903eea0b5467e15bf110 100644 (file)
@@ -97,7 +97,7 @@ type Peer struct {
        // Chunks that we might reasonably expect to receive from the peer. Due to
        // latency, buffering, and implementation differences, we may receive
        // chunks that are no longer in the set of requests actually want.
-       validReceiveChunks map[Request]int
+       validReceiveChunks map[RequestIndex]int
        // Indexed by metadata piece, set to true if posted and pending a
        // response.
        metadataRequests []bool
@@ -175,18 +175,20 @@ func (cn *Peer) updateExpectingChunks() {
 }
 
 func (cn *Peer) expectingChunks() bool {
-       if len(cn.actualRequestState.Requests) == 0 {
+       if cn.actualRequestState.Requests.IsEmpty() {
                return false
        }
        if !cn.actualRequestState.Interested {
                return false
        }
-       for r := range cn.actualRequestState.Requests {
-               if !cn.remoteChokingPiece(r.Index.Int()) {
-                       return true
-               }
+       if cn.peerAllowedFast.IterTyped(func(_i int) bool {
+               i := RequestIndex(_i)
+               return cn.actualRequestState.Requests.Rank((i+1)*cn.t.chunksPerRegularPiece())-
+                       cn.actualRequestState.Requests.Rank(i*cn.t.chunksPerRegularPiece()) == 0
+       }) {
+               return true
        }
-       return false
+       return !cn.peerChoking
 }
 
 func (cn *Peer) remoteChokingPiece(piece pieceIndex) bool {
@@ -333,9 +335,10 @@ func (cn *Peer) downloadRate() float64 {
 
 func (cn *Peer) numRequestsByPiece() (ret map[pieceIndex]int) {
        ret = make(map[pieceIndex]int)
-       for r := range cn.actualRequestState.Requests {
-               ret[pieceIndex(r.Index)]++
-       }
+       cn.actualRequestState.Requests.Iterate(func(x uint32) bool {
+               ret[pieceIndex(x/cn.t.chunksPerRegularPiece())]++
+               return true
+       })
        return
 }
 
@@ -365,7 +368,7 @@ func (cn *Peer) writeStatus(w io.Writer, t *Torrent) {
                &cn._stats.ChunksReadUseful,
                &cn._stats.ChunksRead,
                &cn._stats.ChunksWritten,
-               len(cn.actualRequestState.Requests),
+               cn.actualRequestState.Requests.GetCardinality(),
                cn.nominalMaxRequests(),
                cn.PeerMaxRequests,
                len(cn.peerRequests),
@@ -547,8 +550,9 @@ func (pc *PeerConn) writeInterested(interested bool) bool {
 // are okay.
 type messageWriter func(pp.Message) bool
 
-func (cn *Peer) shouldRequest(r Request) error {
-       if !cn.peerHasPiece(pieceIndex(r.Index)) {
+func (cn *Peer) shouldRequest(r RequestIndex) error {
+       pi := pieceIndex(r / cn.t.chunksPerRegularPiece())
+       if !cn.peerHasPiece(pi) {
                return errors.New("requesting piece peer doesn't have")
        }
        if !cn.t.peerIsActive(cn) {
@@ -557,42 +561,40 @@ func (cn *Peer) shouldRequest(r Request) error {
        if cn.closed.IsSet() {
                panic("requesting when connection is closed")
        }
-       if cn.t.hashingPiece(pieceIndex(r.Index)) {
+       if cn.t.hashingPiece(pi) {
                panic("piece is being hashed")
        }
-       if cn.t.pieceQueuedForHash(pieceIndex(r.Index)) {
+       if cn.t.pieceQueuedForHash(pi) {
                panic("piece is queued for hash")
        }
-       if cn.peerChoking && !cn.peerAllowedFast.Contains(bitmap.BitIndex(r.Index)) {
+       if cn.peerChoking && !cn.peerAllowedFast.Contains(bitmap.BitIndex(pi)) {
                panic("peer choking and piece not allowed fast")
        }
        return nil
 }
 
-func (cn *Peer) request(r Request) (more bool, err error) {
+func (cn *Peer) request(r RequestIndex) (more bool, err error) {
        if err := cn.shouldRequest(r); err != nil {
                panic(err)
        }
-       if _, ok := cn.actualRequestState.Requests[r]; ok {
+       if cn.actualRequestState.Requests.Contains(r) {
                return true, nil
        }
-       if len(cn.actualRequestState.Requests) >= cn.nominalMaxRequests() {
+       if maxRequests(cn.actualRequestState.Requests.GetCardinality()) >= cn.nominalMaxRequests() {
                return true, errors.New("too many outstanding requests")
        }
-       if cn.actualRequestState.Requests == nil {
-               cn.actualRequestState.Requests = make(map[Request]struct{})
-       }
-       cn.actualRequestState.Requests[r] = struct{}{}
+       cn.actualRequestState.Requests.Add(r)
        if cn.validReceiveChunks == nil {
-               cn.validReceiveChunks = make(map[Request]int)
+               cn.validReceiveChunks = make(map[RequestIndex]int)
        }
        cn.validReceiveChunks[r]++
        cn.t.pendingRequests[r]++
        cn.updateExpectingChunks()
+       ppReq := cn.t.requestIndexToRequest(r)
        for _, f := range cn.callbacks.SentRequest {
-               f(PeerRequestEvent{cn, r})
+               f(PeerRequestEvent{cn, ppReq})
        }
-       return cn.peerImpl._request(r), nil
+       return cn.peerImpl._request(ppReq), nil
 }
 
 func (me *PeerConn) _request(r Request) bool {
@@ -604,9 +606,9 @@ func (me *PeerConn) _request(r Request) bool {
        })
 }
 
-func (me *Peer) cancel(r Request) bool {
+func (me *Peer) cancel(r RequestIndex) bool {
        if me.deleteRequest(r) {
-               return me.peerImpl._cancel(r)
+               return me.peerImpl._cancel(me.t.requestIndexToRequest(r))
        }
        return true
 }
@@ -653,7 +655,7 @@ func (cn *PeerConn) postBitfield() {
 }
 
 func (cn *PeerConn) updateRequests() {
-       if len(cn.actualRequestState.Requests) != 0 {
+       if cn.actualRequestState.Requests.GetCardinality() != 0 {
                return
        }
        cn.tickleWriter()
@@ -1128,7 +1130,7 @@ func (c *PeerConn) mainReadLoop() (err error) {
                case pp.HaveNone:
                        err = c.peerSentHaveNone()
                case pp.Reject:
-                       c.remoteRejectedRequest(newRequestFromMessage(&msg))
+                       c.remoteRejectedRequest(c.t.requestIndexFromRequest(newRequestFromMessage(&msg)))
                case pp.AllowedFast:
                        torrent.Add("allowed fasts received", 1)
                        log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c).SetLevel(log.Debug).Log(c.t.logger)
@@ -1145,13 +1147,13 @@ func (c *PeerConn) mainReadLoop() (err error) {
        }
 }
 
-func (c *Peer) remoteRejectedRequest(r Request) {
+func (c *Peer) remoteRejectedRequest(r RequestIndex) {
        if c.deleteRequest(r) {
                c.decExpectedChunkReceive(r)
        }
 }
 
-func (c *Peer) decExpectedChunkReceive(r Request) {
+func (c *Peer) decExpectedChunkReceive(r RequestIndex) {
        count := c.validReceiveChunks[r]
        if count == 1 {
                delete(c.validReceiveChunks, r)
@@ -1250,7 +1252,8 @@ func (c *Peer) doChunkReadStats(size int64) {
 func (c *Peer) receiveChunk(msg *pp.Message) error {
        chunksReceived.Add("total", 1)
 
-       req := newRequestFromMessage(msg)
+       ppReq := newRequestFromMessage(msg)
+       req := c.t.requestIndexFromRequest(ppReq)
 
        if c.peerChoking {
                chunksReceived.Add("while choked", 1)
@@ -1262,7 +1265,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
        }
        c.decExpectedChunkReceive(req)
 
-       if c.peerChoking && c.peerAllowedFast.Get(bitmap.BitIndex(req.Index)) {
+       if c.peerChoking && c.peerAllowedFast.Get(bitmap.BitIndex(ppReq.Index)) {
                chunksReceived.Add("due to allowed fast", 1)
        }
 
@@ -1271,7 +1274,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
        // out.
        deletedRequest := false
        {
-               if _, ok := c.actualRequestState.Requests[req]; ok {
+               if c.actualRequestState.Requests.Contains(req) {
                        for _, f := range c.callbacks.ReceivedRequested {
                                f(PeerMessageEvent{c, msg})
                        }
@@ -1291,13 +1294,13 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
        cl := t.cl
 
        // Do we actually want this chunk?
-       if t.haveChunk(req) {
+       if t.haveChunk(ppReq) {
                chunksReceived.Add("wasted", 1)
                c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadWasted }))
                return nil
        }
 
-       piece := &t.pieces[req.Index]
+       piece := &t.pieces[ppReq.Index]
 
        c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful }))
        c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData }))
@@ -1316,7 +1319,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
        piece.incrementPendingWrites()
        // Record that we have the chunk, so we aren't trying to download it while
        // waiting for it to be written to storage.
-       piece.unpendChunkIndex(chunkIndex(req.ChunkSpec, t.chunkSize))
+       piece.unpendChunkIndex(chunkIndexFromChunkSpec(ppReq.ChunkSpec, t.chunkSize))
 
        // Cancel pending requests for this chunk from *other* peers.
        t.iterPeers(func(p *Peer) {
@@ -1349,11 +1352,11 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
                return nil
        }
 
-       c.onDirtiedPiece(pieceIndex(req.Index))
+       c.onDirtiedPiece(pieceIndex(ppReq.Index))
 
        // We need to ensure the piece is only queued once, so only the last chunk writer gets this job.
-       if t.pieceAllDirty(pieceIndex(req.Index)) && piece.pendingWrites == 0 {
-               t.queuePieceCheck(pieceIndex(req.Index))
+       if t.pieceAllDirty(pieceIndex(ppReq.Index)) && piece.pendingWrites == 0 {
+               t.queuePieceCheck(pieceIndex(ppReq.Index))
                // We don't pend all chunks here anymore because we don't want code dependent on the dirty
                // chunk status (such as the haveChunk call above) to have to check all the various other
                // piece states like queued for hash, hashing etc. This does mean that we need to be sure
@@ -1362,7 +1365,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
 
        cl.event.Broadcast()
        // We do this because we've written a chunk, and may change PieceState.Partial.
-       t.publishPieceChange(pieceIndex(req.Index))
+       t.publishPieceChange(pieceIndex(ppReq.Index))
 
        return nil
 }
@@ -1456,14 +1459,13 @@ func (c *Peer) peerHasWantedPieces() bool {
        return !c._pieceRequestOrder.IsEmpty()
 }
 
-func (c *Peer) deleteRequest(r Request) bool {
-       delete(c.nextRequestState.Requests, r)
-       if _, ok := c.actualRequestState.Requests[r]; !ok {
+func (c *Peer) deleteRequest(r RequestIndex) bool {
+       c.nextRequestState.Requests.Remove(r)
+       if !c.actualRequestState.Requests.CheckedRemove(r) {
                return false
        }
-       delete(c.actualRequestState.Requests, r)
        for _, f := range c.callbacks.DeletedRequest {
-               f(PeerRequestEvent{c, r})
+               f(PeerRequestEvent{c, c.t.requestIndexToRequest(r)})
        }
        c.updateExpectingChunks()
        pr := c.t.pendingRequests
@@ -1479,13 +1481,14 @@ func (c *Peer) deleteRequest(r Request) bool {
 }
 
 func (c *Peer) deleteAllRequests() {
-       for r := range c.actualRequestState.Requests {
-               c.deleteRequest(r)
-       }
-       if l := len(c.actualRequestState.Requests); l != 0 {
-               panic(l)
+       c.actualRequestState.Requests.Clone().Iterate(func(x uint32) bool {
+               c.deleteRequest(x)
+               return true
+       })
+       if !c.actualRequestState.Requests.IsEmpty() {
+               panic(c.actualRequestState.Requests.GetCardinality())
        }
-       c.nextRequestState.Requests = nil
+       c.nextRequestState.Requests.Clear()
        // for c := range c.t.conns {
        //      c.tickleWriter()
        // }
index 2f337958a6f9a2e1483c7731fb8f0b1787037b7b..395dbb62bb1cc8e63f64b22d5eaf11f725ce4354 100644 (file)
@@ -23,7 +23,7 @@ func TestSendBitfieldThenHave(t *testing.T) {
        cl.initLogger()
        c := cl.newConnection(nil, false, nil, "io.Pipe", "")
        c.setTorrent(cl.newTorrent(metainfo.Hash{}, nil))
-       if err := c.t.setInfo(&metainfo.Info{ Pieces: make([]byte, metainfo.HashSize*3) }); err != nil {
+       if err := c.t.setInfo(&metainfo.Info{Pieces: make([]byte, metainfo.HashSize*3)}); err != nil {
                t.Log(err)
        }
        r, w := io.Pipe()
@@ -129,7 +129,9 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) {
                        // The chunk must be written to storage everytime, to ensure the
                        // writeSem is unlocked.
                        t.pieces[0]._dirtyChunks.Clear()
-                       cn.validReceiveChunks = map[Request]int{newRequestFromMessage(&msg): 1}
+                       cn.validReceiveChunks = map[RequestIndex]int{
+                               t.requestIndexFromRequest(newRequestFromMessage(&msg)): 1,
+                       }
                        cl.unlock()
                        n, err := w.Write(wb)
                        require.NoError(b, err)
index c79446972c19ae907e156981264f44edc0a72da4..30ac229787eb82fe594317b1510120b5c5929df1 100644 (file)
--- a/piece.go
+++ b/piece.go
@@ -55,12 +55,12 @@ func (p *Piece) Storage() storage.Piece {
        return p.t.storage.Piece(p.Info())
 }
 
-func (p *Piece) pendingChunkIndex(chunkIndex int) bool {
+func (p *Piece) pendingChunkIndex(chunkIndex chunkIndexType) bool {
        return !p._dirtyChunks.Contains(bitmap.BitIndex(chunkIndex))
 }
 
 func (p *Piece) pendingChunk(cs ChunkSpec, chunkSize pp.Integer) bool {
-       return p.pendingChunkIndex(chunkIndex(cs, chunkSize))
+       return p.pendingChunkIndex(chunkIndexFromChunkSpec(cs, chunkSize))
 }
 
 func (p *Piece) hasDirtyChunks() bool {
@@ -71,17 +71,17 @@ func (p *Piece) numDirtyChunks() pp.Integer {
        return pp.Integer(p._dirtyChunks.Len())
 }
 
-func (p *Piece) unpendChunkIndex(i int) {
+func (p *Piece) unpendChunkIndex(i chunkIndexType) {
        p._dirtyChunks.Add(bitmap.BitIndex(i))
        p.readerCond.Broadcast()
 }
 
-func (p *Piece) pendChunkIndex(i int) {
+func (p *Piece) pendChunkIndex(i RequestIndex) {
        p._dirtyChunks.Remove(bitmap.BitIndex(i))
 }
 
 func (p *Piece) numChunks() pp.Integer {
-       return p.t.pieceNumChunks(p.index)
+       return pp.Integer(p.t.pieceNumChunks(p.index))
 }
 
 func (p *Piece) incrementPendingWrites() {
@@ -237,11 +237,15 @@ func (p *Piece) State() PieceState {
        return p.t.PieceState(p.index)
 }
 
-func (p *Piece) iterUndirtiedChunks(f func(cs ChunkSpec)) {
-       for i := pp.Integer(0); i < p.numChunks(); i++ {
-               if p.chunkIndexDirty(i) {
+func (p *Piece) iterUndirtiedChunks(f func(cs chunkIndexType)) {
+       for i := chunkIndexType(0); i < chunkIndexType(p.numChunks()); i++ {
+               if p.chunkIndexDirty(pp.Integer(i)) {
                        continue
                }
-               f(p.chunkIndexSpec(i))
+               f(i)
        }
 }
+
+func (p *Piece) requestIndexOffset() RequestIndex {
+       return RequestIndex(p.index) * p.t.chunksPerRegularPiece()
+}
index 473b02b3fffc4468ea5f3a53539bb2406079dd53..752198a6496e63f3c03d58329bbbb4a55ebbd398 100644 (file)
@@ -10,11 +10,12 @@ import (
        "github.com/anacrolix/torrent/metainfo"
        "github.com/anacrolix/torrent/storage"
 
-       pp "github.com/anacrolix/torrent/peer_protocol"
        "github.com/anacrolix/torrent/types"
 )
 
 type (
+       RequestIndex  = uint32
+       ChunkIndex    = uint32
        Request       = types.Request
        pieceIndex    = types.PieceIndex
        piecePriority = types.PiecePriority
@@ -59,15 +60,13 @@ type requestsPeer struct {
 }
 
 func (rp *requestsPeer) canFitRequest() bool {
-       return len(rp.nextState.Requests) < rp.MaxRequests
+       return int(rp.nextState.Requests.GetCardinality()) < rp.MaxRequests
 }
 
-func (rp *requestsPeer) addNextRequest(r Request) {
-       _, ok := rp.nextState.Requests[r]
-       if ok {
+func (rp *requestsPeer) addNextRequest(r RequestIndex) {
+       if !rp.nextState.Requests.CheckedAdd(r) {
                panic("should only add once")
        }
-       rp.nextState.Requests[r] = struct{}{}
 }
 
 type peersForPieceRequests struct {
@@ -75,7 +74,7 @@ type peersForPieceRequests struct {
        *requestsPeer
 }
 
-func (me *peersForPieceRequests) addNextRequest(r Request) {
+func (me *peersForPieceRequests) addNextRequest(r RequestIndex) {
        me.requestsPeer.addNextRequest(r)
        me.requestsInPiece++
 }
@@ -88,6 +87,10 @@ type requestablePiece struct {
        IterPendingChunks ChunksIter
 }
 
+func (p *requestablePiece) chunkIndexToRequestIndex(c ChunkIndex) RequestIndex {
+       return RequestIndex(p.t.ChunksPerPiece*p.index) + RequestIndex(c)
+}
+
 type filterPiece struct {
        t     *filterTorrent
        index pieceIndex
@@ -181,9 +184,6 @@ func Run(input Input) map[PeerId]PeerNextRequestState {
                for _, p := range t.Peers {
                        peers = append(peers, &requestsPeer{
                                Peer: p,
-                               nextState: PeerNextRequestState{
-                                       Requests: make(map[Request]struct{}, p.MaxRequests),
-                               },
                        })
                }
                allPeers[t.InfoHash] = peers
@@ -239,7 +239,7 @@ func makePeersForPiece(cap int) []*peersForPieceRequests {
 
 type peersForPieceSorter struct {
        peersForPiece []*peersForPieceRequests
-       req           *Request
+       req           *RequestIndex
        p             requestablePiece
 }
 
@@ -259,8 +259,8 @@ func (me *peersForPieceSorter) Less(_i, _j int) bool {
        byHasRequest := func() multiless.Computation {
                ml := multiless.New()
                if req != nil {
-                       _, iHas := i.nextState.Requests[*req]
-                       _, jHas := j.nextState.Requests[*req]
+                       iHas := i.nextState.Requests.Contains(*req)
+                       jHas := j.nextState.Requests.Contains(*req)
                        ml = ml.Bool(jHas, iHas)
                }
                return ml
@@ -327,16 +327,16 @@ func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) {
                peersForPiece: peersForPiece,
                p:             p,
        }
-       sortPeersForPiece := func(req *Request) {
+       sortPeersForPiece := func(req *RequestIndex) {
                peersForPieceSorter.req = req
                sort.Sort(&peersForPieceSorter)
                //ensureValidSortedPeersForPieceRequests(&peersForPieceSorter)
        }
        // Chunks can be preassigned several times, if peers haven't been able to update their "actual"
        // with "next" request state before another request strategy run occurs.
-       preallocated := make(map[ChunkSpec][]*peersForPieceRequests, p.NumPendingChunks)
-       p.IterPendingChunks(func(spec ChunkSpec) {
-               req := Request{pp.Integer(p.index), spec}
+       preallocated := make([][]*peersForPieceRequests, p.t.ChunksPerPiece)
+       p.IterPendingChunks(func(spec ChunkIndex) {
+               req := p.chunkIndexToRequestIndex(spec)
                for _, peer := range peersForPiece {
                        if h := peer.HasExistingRequest; h == nil || !h(req) {
                                continue
@@ -349,11 +349,11 @@ func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) {
                }
        })
        pendingChunksRemaining := int(p.NumPendingChunks)
-       p.IterPendingChunks(func(chunk types.ChunkSpec) {
-               if _, ok := preallocated[chunk]; ok {
+       p.IterPendingChunks(func(chunk ChunkIndex) {
+               if len(preallocated[chunk]) != 0 {
                        return
                }
-               req := Request{pp.Integer(p.index), chunk}
+               req := p.chunkIndexToRequestIndex(chunk)
                defer func() { pendingChunksRemaining-- }()
                sortPeersForPiece(nil)
                for _, peer := range peersForPiece {
@@ -373,14 +373,17 @@ func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) {
        })
 chunk:
        for chunk, prePeers := range preallocated {
+               if len(prePeers) == 0 {
+                       continue
+               }
                pendingChunksRemaining--
-               req := Request{pp.Integer(p.index), chunk}
+               req := p.chunkIndexToRequestIndex(ChunkIndex(chunk))
                for _, pp := range prePeers {
                        pp.requestsInPiece--
                }
                sortPeersForPiece(&req)
                for _, pp := range prePeers {
-                       delete(pp.nextState.Requests, req)
+                       pp.nextState.Requests.Remove(req)
                }
                for _, peer := range peersForPiece {
                        if !peer.canFitRequest() {
index 89a85085bff67f053ec0fff6c63ba48c0c091145..d15988f74c7bb2ff5bc9743f85cd8a41dea59411 100644 (file)
@@ -4,36 +4,29 @@ import (
        "math"
        "testing"
 
+       "github.com/RoaringBitmap/roaring"
        qt "github.com/frankban/quicktest"
-
-       pp "github.com/anacrolix/torrent/peer_protocol"
+       "github.com/google/go-cmp/cmp"
 )
 
-func r(i pieceIndex, begin int) Request {
-       return Request{pp.Integer(i), ChunkSpec{pp.Integer(begin), 1}}
-}
-
-func chunkIterRange(end int) func(func(ChunkSpec)) {
-       return func(f func(ChunkSpec)) {
-               for offset := 0; offset < end; offset += 1 {
-                       f(ChunkSpec{pp.Integer(offset), 1})
+func chunkIterRange(end ChunkIndex) ChunksIter {
+       return func(f func(ChunkIndex)) {
+               for offset := ChunkIndex(0); offset < end; offset += 1 {
+                       f(offset)
                }
        }
 }
 
-func chunkIter(offsets ...int) func(func(ChunkSpec)) {
-       return func(f func(ChunkSpec)) {
+func chunkIter(offsets ...ChunkIndex) ChunksIter {
+       return func(f func(ChunkIndex)) {
                for _, offset := range offsets {
-                       f(ChunkSpec{pp.Integer(offset), 1})
+                       f(offset)
                }
        }
 }
 
-func requestSetFromSlice(rs ...Request) (ret map[Request]struct{}) {
-       ret = make(map[Request]struct{}, len(rs))
-       for _, r := range rs {
-               ret[r] = struct{}{}
-       }
+func requestSetFromSlice(rs ...RequestIndex) (ret roaring.Bitmap) {
+       ret.AddMany(rs)
        return
 }
 
@@ -43,6 +36,8 @@ func (i intPeerId) Uintptr() uintptr {
        return uintptr(i)
 }
 
+func hasAllRequests(RequestIndex) bool { return true }
+
 func TestStealingFromSlowerPeer(t *testing.T) {
        c := qt.New(t)
        basePeer := Peer{
@@ -55,15 +50,14 @@ func TestStealingFromSlowerPeer(t *testing.T) {
        // Slower than the stealers, but has all requests already.
        stealee := basePeer
        stealee.DownloadRate = 1
-       stealee.HasExistingRequest = func(r Request) bool {
-               return true
-       }
+       stealee.HasExistingRequest = hasAllRequests
        stealee.Id = intPeerId(1)
        firstStealer := basePeer
        firstStealer.Id = intPeerId(2)
        secondStealer := basePeer
        secondStealer.Id = intPeerId(3)
        results := Run(Input{Torrents: []Torrent{{
+               ChunksPerPiece: 9,
                Pieces: []Piece{{
                        Request:           true,
                        NumPendingChunks:  5,
@@ -77,8 +71,9 @@ func TestStealingFromSlowerPeer(t *testing.T) {
        }}})
 
        c.Assert(results, qt.HasLen, 3)
-       check := func(p PeerId, l int) {
-               c.Check(results[p].Requests, qt.HasLen, l)
+       check := func(p PeerId, l uint64) {
+               addressableBm := results[p].Requests
+               c.Check(addressableBm.GetCardinality(), qt.ContentEquals, l)
                c.Check(results[p].Interested, qt.Equals, l > 0)
        }
        check(stealee.Id, 1)
@@ -86,8 +81,9 @@ func TestStealingFromSlowerPeer(t *testing.T) {
        check(secondStealer.Id, 2)
 }
 
-func checkNumRequestsAndInterest(c *qt.C, next PeerNextRequestState, num int, interest bool) {
-       c.Check(next.Requests, qt.HasLen, num)
+func checkNumRequestsAndInterest(c *qt.C, next PeerNextRequestState, num uint64, interest bool) {
+       addressableBm := next.Requests
+       c.Check(addressableBm.GetCardinality(), qt.ContentEquals, num)
        c.Check(next.Interested, qt.Equals, interest)
 }
 
@@ -102,15 +98,14 @@ func TestStealingFromSlowerPeersBasic(t *testing.T) {
        }
        stealee := basePeer
        stealee.DownloadRate = 1
-       stealee.HasExistingRequest = func(r Request) bool {
-               return true
-       }
+       stealee.HasExistingRequest = hasAllRequests
        stealee.Id = intPeerId(1)
        firstStealer := basePeer
        firstStealer.Id = intPeerId(2)
        secondStealer := basePeer
        secondStealer.Id = intPeerId(3)
        results := Run(Input{Torrents: []Torrent{{
+               ChunksPerPiece: 9,
                Pieces: []Piece{{
                        Request:           true,
                        NumPendingChunks:  2,
@@ -128,6 +123,10 @@ func TestStealingFromSlowerPeersBasic(t *testing.T) {
        checkNumRequestsAndInterest(c, results[stealee.Id], 0, false)
 }
 
+func checkResultsRequestsLen(t *testing.T, reqs roaring.Bitmap, l uint64) {
+       qt.Check(t, reqs.GetCardinality(), qt.Equals, l)
+}
+
 func TestPeerKeepsExistingIfReasonable(t *testing.T) {
        c := qt.New(t)
        basePeer := Peer{
@@ -140,8 +139,8 @@ func TestPeerKeepsExistingIfReasonable(t *testing.T) {
        // Slower than the stealers, but has all requests already.
        stealee := basePeer
        stealee.DownloadRate = 1
-       keepReq := r(0, 0)
-       stealee.HasExistingRequest = func(r Request) bool {
+       keepReq := RequestIndex(0)
+       stealee.HasExistingRequest = func(r RequestIndex) bool {
                return r == keepReq
        }
        stealee.Id = intPeerId(1)
@@ -150,6 +149,7 @@ func TestPeerKeepsExistingIfReasonable(t *testing.T) {
        secondStealer := basePeer
        secondStealer.Id = intPeerId(3)
        results := Run(Input{Torrents: []Torrent{{
+               ChunksPerPiece: 9,
                Pieces: []Piece{{
                        Request:           true,
                        NumPendingChunks:  4,
@@ -163,18 +163,29 @@ func TestPeerKeepsExistingIfReasonable(t *testing.T) {
        }}})
 
        c.Assert(results, qt.HasLen, 3)
-       check := func(p PeerId, l int) {
-               c.Check(results[p].Requests, qt.HasLen, l)
+       check := func(p PeerId, l uint64) {
+               checkResultsRequestsLen(t, results[p].Requests, l)
                c.Check(results[p].Interested, qt.Equals, l > 0)
        }
        check(firstStealer.Id, 2)
        check(secondStealer.Id, 1)
-       c.Check(results[stealee.Id], qt.ContentEquals, PeerNextRequestState{
-               Interested: true,
-               Requests:   requestSetFromSlice(keepReq),
-       })
+       c.Check(
+               results[stealee.Id],
+               peerNextRequestStateChecker,
+               PeerNextRequestState{
+                       Interested: true,
+                       Requests:   requestSetFromSlice(keepReq),
+               },
+       )
 }
 
+var peerNextRequestStateChecker = qt.CmpEquals(
+       cmp.Transformer(
+               "bitmap",
+               func(bm roaring.Bitmap) []uint32 {
+                       return bm.ToArray()
+               }))
+
 func TestDontStealUnnecessarily(t *testing.T) {
        c := qt.New(t)
        basePeer := Peer{
@@ -187,12 +198,14 @@ func TestDontStealUnnecessarily(t *testing.T) {
        // Slower than the stealers, but has all requests already.
        stealee := basePeer
        stealee.DownloadRate = 1
+       r := func(i, c RequestIndex) RequestIndex {
+               return i*9 + c
+       }
        keepReqs := requestSetFromSlice(
                r(3, 2), r(3, 4), r(3, 6), r(3, 8),
                r(4, 0), r(4, 1), r(4, 7), r(4, 8))
-       stealee.HasExistingRequest = func(r Request) bool {
-               _, ok := keepReqs[r]
-               return ok
+       stealee.HasExistingRequest = func(r RequestIndex) bool {
+               return keepReqs.Contains(r)
        }
        stealee.Id = intPeerId(1)
        firstStealer := basePeer
@@ -208,6 +221,7 @@ func TestDontStealUnnecessarily(t *testing.T) {
                }
        }
        results := Run(Input{Torrents: []Torrent{{
+               ChunksPerPiece: 9,
                Pieces: []Piece{
                        {
                                Request:           true,
@@ -242,16 +256,20 @@ func TestDontStealUnnecessarily(t *testing.T) {
        }}})
 
        c.Assert(results, qt.HasLen, 3)
-       check := func(p PeerId, l int) {
-               c.Check(results[p].Requests, qt.HasLen, l)
+       check := func(p PeerId, l uint64) {
+               checkResultsRequestsLen(t, results[p].Requests, l)
                c.Check(results[p].Interested, qt.Equals, l > 0)
        }
        check(firstStealer.Id, 5)
        check(secondStealer.Id, 7+9)
-       c.Check(results[stealee.Id], qt.ContentEquals, PeerNextRequestState{
-               Interested: true,
-               Requests:   requestSetFromSlice(r(4, 0), r(4, 1), r(4, 7), r(4, 8)),
-       })
+       c.Check(
+               results[stealee.Id],
+               peerNextRequestStateChecker,
+               PeerNextRequestState{
+                       Interested: true,
+                       Requests:   requestSetFromSlice(r(4, 0), r(4, 1), r(4, 7), r(4, 8)),
+               },
+       )
 }
 
 // This tests a situation where multiple peers had the same existing request, due to "actual" and
@@ -260,10 +278,8 @@ func TestDontStealUnnecessarily(t *testing.T) {
 func TestDuplicatePreallocations(t *testing.T) {
        peer := func(id int, downloadRate float64) Peer {
                return Peer{
-                       HasExistingRequest: func(r Request) bool {
-                               return true
-                       },
-                       MaxRequests: 2,
+                       HasExistingRequest: hasAllRequests,
+                       MaxRequests:        2,
                        HasPiece: func(i pieceIndex) bool {
                                return true
                        },
@@ -273,6 +289,7 @@ func TestDuplicatePreallocations(t *testing.T) {
        }
        results := Run(Input{
                Torrents: []Torrent{{
+                       ChunksPerPiece: 1,
                        Pieces: []Piece{{
                                Request:           true,
                                NumPendingChunks:  1,
@@ -292,5 +309,7 @@ func TestDuplicatePreallocations(t *testing.T) {
                }},
        })
        c := qt.New(t)
-       c.Assert(2, qt.Equals, len(results[intPeerId(1)].Requests)+len(results[intPeerId(2)].Requests))
+       req1 := results[intPeerId(1)].Requests
+       req2 := results[intPeerId(2)].Requests
+       c.Assert(uint64(2), qt.Equals, req1.GetCardinality()+req2.GetCardinality())
 }
index b62220641778c291c25281435b7ba9d5a3401018..ece8ea427adce35c2c4961294ad97cf56367d157 100644 (file)
@@ -2,11 +2,13 @@ package request_strategy
 
 import (
        "time"
+
+       "github.com/RoaringBitmap/roaring"
 )
 
 type PeerNextRequestState struct {
        Interested bool
-       Requests   map[Request]struct{}
+       Requests   roaring.Bitmap
 }
 
 type PeerId interface {
@@ -16,7 +18,7 @@ type PeerId interface {
 type Peer struct {
        HasPiece           func(i pieceIndex) bool
        MaxRequests        int
-       HasExistingRequest func(r Request) bool
+       HasExistingRequest func(r RequestIndex) bool
        Choking            bool
        PieceAllowedFast   func(pieceIndex) bool
        DownloadRate       float64
index bc59c052344aaa5689fb7e1af806cec75906bfae..2dbe0bcf40f820e46a103a2f2dde1a6ba2d40ad2 100644 (file)
@@ -1,10 +1,6 @@
 package request_strategy
 
-import (
-       "github.com/anacrolix/torrent/types"
-)
-
-type ChunksIter func(func(types.ChunkSpec))
+type ChunksIter func(func(ChunkIndex))
 
 type Piece struct {
        Request           bool
@@ -16,7 +12,7 @@ type Piece struct {
        IterPendingChunks ChunksIter
 }
 
-func (p Piece) iterPendingChunksWrapper(f func(ChunkSpec)) {
+func (p Piece) iterPendingChunksWrapper(f func(ChunkIndex)) {
        i := p.IterPendingChunks
        if i != nil {
                i(f)
index a92bffc0bda8199c15c5a2b16a34eb57466de578..262ae9656682cc9f04bc14aa3c1dd0666aeddf37 100644 (file)
@@ -11,7 +11,8 @@ type Torrent struct {
        // Unclosed Peers. Not necessary for getting requestable piece ordering.
        Peers []Peer
        // Some value that's unique and stable between runs. Could even use the infohash?
-       InfoHash metainfo.Hash
+       InfoHash       metainfo.Hash
+       ChunksPerPiece int
 
        MaxUnverifiedBytes int64
 }
index 29a6dd96dfdcbecb3ec4747b98c0e89a431f1e2c..9f896738562afc4b42458010ccd35c4770ea369a 100644 (file)
@@ -5,7 +5,6 @@ import (
        "unsafe"
 
        "github.com/anacrolix/missinggo/v2/bitmap"
-       pp "github.com/anacrolix/torrent/peer_protocol"
 
        "github.com/anacrolix/chansync"
        request_strategy "github.com/anacrolix/torrent/request-strategy"
@@ -43,8 +42,13 @@ func (cl *Client) tickleRequester() {
 func (cl *Client) getRequestStrategyInput() request_strategy.Input {
        ts := make([]request_strategy.Torrent, 0, len(cl.torrents))
        for _, t := range cl.torrents {
+               if !t.haveInfo() {
+                       // This would be removed if metadata is handled here.
+                       continue
+               }
                rst := request_strategy.Torrent{
-                       InfoHash: t.infoHash,
+                       InfoHash:       t.infoHash,
+                       ChunksPerPiece: (t.usualPieceSize() + int(t.chunkSize) - 1) / int(t.chunkSize),
                }
                if t.storage != nil {
                        rst.Capacity = t.storage.Capacity
@@ -73,9 +77,8 @@ func (cl *Client) getRequestStrategyInput() request_strategy.Input {
                        rst.Peers = append(rst.Peers, request_strategy.Peer{
                                HasPiece:    p.peerHasPiece,
                                MaxRequests: p.nominalMaxRequests(),
-                               HasExistingRequest: func(r request_strategy.Request) bool {
-                                       _, ok := p.actualRequestState.Requests[r]
-                                       return ok
+                               HasExistingRequest: func(r RequestIndex) bool {
+                                       return p.actualRequestState.Requests.Contains(r)
                                },
                                Choking: p.peerChoking,
                                PieceAllowedFast: func(i pieceIndex) bool {
@@ -119,8 +122,11 @@ func setPeerNextRequestState(_p request_strategy.PeerId, rp request_strategy.Pee
        p.onNextRequestStateChanged()
 }
 
+type RequestIndex = request_strategy.RequestIndex
+type chunkIndexType = request_strategy.ChunkIndex
+
 func (p *Peer) applyNextRequestState() bool {
-       if len(p.actualRequestState.Requests) > p.nominalMaxRequests()/2 {
+       if p.actualRequestState.Requests.GetCardinality() > uint64(p.nominalMaxRequests()/2) {
                return true
        }
        type piece struct {
@@ -148,8 +154,8 @@ func (p *Peer) applyNextRequestState() bool {
        for _, endGameIter := range []bool{false, true} {
                for _, piece := range pieceOrder {
                        tp := p.t.piece(piece.index)
-                       tp.iterUndirtiedChunks(func(cs ChunkSpec) {
-                               req := Request{pp.Integer(piece.index), cs}
+                       tp.iterUndirtiedChunks(func(cs chunkIndexType) {
+                               req := cs + tp.requestIndexOffset()
                                if !piece.endGame && !endGameIter && p.t.pendingRequests[req] > 0 {
                                        return
                                }
@@ -158,10 +164,10 @@ func (p *Peer) applyNextRequestState() bool {
                                if !more {
                                        return
                                }
-                               if len(p.actualRequestState.Requests) >= p.nominalMaxRequests() {
+                               if maxRequests(p.actualRequestState.Requests.GetCardinality()) >= p.nominalMaxRequests() {
                                        return
                                }
-                               if p.peerChoking && !p.peerAllowedFast.Contains(bitmap.BitIndex(req.Index)) {
+                               if p.peerChoking && !p.peerAllowedFast.Contains(bitmap.BitIndex(piece.index)) {
                                        return
                                }
                                var err error
@@ -170,7 +176,7 @@ func (p *Peer) applyNextRequestState() bool {
                                        panic(err)
                                }
                        })
-                       if interested && len(p.actualRequestState.Requests) >= p.nominalMaxRequests() {
+                       if interested && maxRequests(p.actualRequestState.Requests.GetCardinality()) >= p.nominalMaxRequests() {
                                break
                        }
                        if !more {
index a584874b383cb327933e2cbde47f4fc34c4dd696..bf7968cf0671f0db2996cb833a2960e076f1afb9 100644 (file)
@@ -143,7 +143,7 @@ type Torrent struct {
        connPieceInclinationPool sync.Pool
 
        // Count of each request across active connections.
-       pendingRequests map[Request]int
+       pendingRequests map[RequestIndex]int
 
        pex pexState
 }
@@ -440,7 +440,7 @@ func (t *Torrent) onSetInfo() {
        t.cl.event.Broadcast()
        close(t.gotMetainfoC)
        t.updateWantPeersEvent()
-       t.pendingRequests = make(map[Request]int)
+       t.pendingRequests = make(map[RequestIndex]int)
        t.tryCreateMorePieceHashers()
 }
 
@@ -842,8 +842,12 @@ func (t *Torrent) bitfield() (bf []bool) {
        return
 }
 
-func (t *Torrent) pieceNumChunks(piece pieceIndex) pp.Integer {
-       return (t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize
+func (t *Torrent) chunksPerRegularPiece() uint32 {
+       return uint32((pp.Integer(t.usualPieceSize()) + t.chunkSize - 1) / t.chunkSize)
+}
+
+func (t *Torrent) pieceNumChunks(piece pieceIndex) chunkIndexType {
+       return chunkIndexType((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize)
 }
 
 func (t *Torrent) pendAllChunkSpecs(pieceIndex pieceIndex) {
@@ -940,8 +944,8 @@ func (t *Torrent) haveChunk(r Request) (ret bool) {
        return !p.pendingChunk(r.ChunkSpec, t.chunkSize)
 }
 
-func chunkIndex(cs ChunkSpec, chunkSize pp.Integer) int {
-       return int(cs.Begin / chunkSize)
+func chunkIndexFromChunkSpec(cs ChunkSpec, chunkSize pp.Integer) chunkIndexType {
+       return chunkIndexType(cs.Begin / chunkSize)
 }
 
 func (t *Torrent) wantPieceIndex(index pieceIndex) bool {
@@ -1033,7 +1037,7 @@ func (t *Torrent) pieceNumPendingChunks(piece pieceIndex) pp.Integer {
        if t.pieceComplete(piece) {
                return 0
        }
-       return t.pieceNumChunks(piece) - t.pieces[piece].numDirtyChunks()
+       return pp.Integer(t.pieceNumChunks(piece)) - t.pieces[piece].numDirtyChunks()
 }
 
 func (t *Torrent) pieceAllDirty(piece pieceIndex) bool {
@@ -1170,9 +1174,8 @@ func (t *Torrent) piecePriority(piece pieceIndex) piecePriority {
        return ret
 }
 
-func (t *Torrent) pendRequest(req Request) {
-       ci := chunkIndex(req.ChunkSpec, t.chunkSize)
-       t.pieces[req.Index].pendChunkIndex(ci)
+func (t *Torrent) pendRequest(req RequestIndex) {
+       t.piece(int(req / t.chunksPerRegularPiece())).pendChunkIndex(req % t.chunksPerRegularPiece())
 }
 
 func (t *Torrent) pieceCompletionChanged(piece pieceIndex) {
@@ -2259,3 +2262,19 @@ func (t *Torrent) peerIsActive(p *Peer) (active bool) {
        })
        return
 }
+
+func (t *Torrent) requestIndexToRequest(ri RequestIndex) Request {
+       index := ri / t.chunksPerRegularPiece()
+       return Request{
+               pp.Integer(index),
+               t.piece(int(index)).chunkIndexSpec(pp.Integer(ri % t.chunksPerRegularPiece())),
+       }
+}
+
+func (t *Torrent) requestIndexFromRequest(r Request) RequestIndex {
+       return t.chunksPerRegularPiece()*uint32(r.Index) + uint32(r.Begin/t.chunkSize)
+}
+
+func (t *Torrent) numChunks() RequestIndex {
+       return RequestIndex((t.Length() + int64(t.chunkSize) - 1) / int64(t.chunkSize))
+}
index f2ccbff5550df6cc414ad5f5f5b3e96dd35527af..9263aa46227f7f356717da7e1f5e36d889fff73c 100644 (file)
@@ -78,11 +78,17 @@ func (ws *webseedPeer) requester() {
        defer ws.requesterCond.L.Unlock()
 start:
        for !ws.peer.closed.IsSet() {
-               for r := range ws.peer.actualRequestState.Requests {
+               restart := false
+               ws.peer.actualRequestState.Requests.Iterate(func(x uint32) bool {
+                       r := ws.peer.t.requestIndexToRequest(x)
                        if _, ok := ws.activeRequests[r]; ok {
-                               continue
+                               return true
                        }
                        ws.doRequest(r)
+                       restart = true
+                       return false
+               })
+               if restart {
                        goto start
                }
                ws.requesterCond.Wait()
@@ -134,7 +140,7 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re
                        }() {
                        ws.peer.close()
                } else {
-                       ws.peer.remoteRejectedRequest(r)
+                       ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r))
                }
        } else {
                err := ws.peer.receiveChunk(&pp.Message{