From 1d2d1a9cdeeb3e0300478c3f73fd3afdeda89b56 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Sun, 19 Sep 2021 15:16:37 +1000 Subject: [PATCH] Store peer requests in a bitmap --- peerconn.go | 109 +++++++++++++++--------------- peerconn_test.go | 6 +- piece.go | 22 +++--- request-strategy/order.go | 47 +++++++------ request-strategy/order_test.go | 119 +++++++++++++++++++-------------- request-strategy/peer.go | 6 +- request-strategy/piece.go | 8 +-- request-strategy/torrent.go | 3 +- requesting.go | 28 +++++--- torrent.go | 39 ++++++++--- webseed-peer.go | 12 +++- 11 files changed, 230 insertions(+), 169 deletions(-) diff --git a/peerconn.go b/peerconn.go index 452166ac..1b9bdbc1 100644 --- a/peerconn.go +++ b/peerconn.go @@ -97,7 +97,7 @@ type Peer struct { // Chunks that we might reasonably expect to receive from the peer. Due to // latency, buffering, and implementation differences, we may receive // chunks that are no longer in the set of requests actually want. - validReceiveChunks map[Request]int + validReceiveChunks map[RequestIndex]int // Indexed by metadata piece, set to true if posted and pending a // response. metadataRequests []bool @@ -175,18 +175,20 @@ func (cn *Peer) updateExpectingChunks() { } func (cn *Peer) expectingChunks() bool { - if len(cn.actualRequestState.Requests) == 0 { + if cn.actualRequestState.Requests.IsEmpty() { return false } if !cn.actualRequestState.Interested { return false } - for r := range cn.actualRequestState.Requests { - if !cn.remoteChokingPiece(r.Index.Int()) { - return true - } + if cn.peerAllowedFast.IterTyped(func(_i int) bool { + i := RequestIndex(_i) + return cn.actualRequestState.Requests.Rank((i+1)*cn.t.chunksPerRegularPiece())- + cn.actualRequestState.Requests.Rank(i*cn.t.chunksPerRegularPiece()) == 0 + }) { + return true } - return false + return !cn.peerChoking } func (cn *Peer) remoteChokingPiece(piece pieceIndex) bool { @@ -333,9 +335,10 @@ func (cn *Peer) downloadRate() float64 { func (cn *Peer) numRequestsByPiece() (ret map[pieceIndex]int) { ret = make(map[pieceIndex]int) - for r := range cn.actualRequestState.Requests { - ret[pieceIndex(r.Index)]++ - } + cn.actualRequestState.Requests.Iterate(func(x uint32) bool { + ret[pieceIndex(x/cn.t.chunksPerRegularPiece())]++ + return true + }) return } @@ -365,7 +368,7 @@ func (cn *Peer) writeStatus(w io.Writer, t *Torrent) { &cn._stats.ChunksReadUseful, &cn._stats.ChunksRead, &cn._stats.ChunksWritten, - len(cn.actualRequestState.Requests), + cn.actualRequestState.Requests.GetCardinality(), cn.nominalMaxRequests(), cn.PeerMaxRequests, len(cn.peerRequests), @@ -547,8 +550,9 @@ func (pc *PeerConn) writeInterested(interested bool) bool { // are okay. type messageWriter func(pp.Message) bool -func (cn *Peer) shouldRequest(r Request) error { - if !cn.peerHasPiece(pieceIndex(r.Index)) { +func (cn *Peer) shouldRequest(r RequestIndex) error { + pi := pieceIndex(r / cn.t.chunksPerRegularPiece()) + if !cn.peerHasPiece(pi) { return errors.New("requesting piece peer doesn't have") } if !cn.t.peerIsActive(cn) { @@ -557,42 +561,40 @@ func (cn *Peer) shouldRequest(r Request) error { if cn.closed.IsSet() { panic("requesting when connection is closed") } - if cn.t.hashingPiece(pieceIndex(r.Index)) { + if cn.t.hashingPiece(pi) { panic("piece is being hashed") } - if cn.t.pieceQueuedForHash(pieceIndex(r.Index)) { + if cn.t.pieceQueuedForHash(pi) { panic("piece is queued for hash") } - if cn.peerChoking && !cn.peerAllowedFast.Contains(bitmap.BitIndex(r.Index)) { + if cn.peerChoking && !cn.peerAllowedFast.Contains(bitmap.BitIndex(pi)) { panic("peer choking and piece not allowed fast") } return nil } -func (cn *Peer) request(r Request) (more bool, err error) { +func (cn *Peer) request(r RequestIndex) (more bool, err error) { if err := cn.shouldRequest(r); err != nil { panic(err) } - if _, ok := cn.actualRequestState.Requests[r]; ok { + if cn.actualRequestState.Requests.Contains(r) { return true, nil } - if len(cn.actualRequestState.Requests) >= cn.nominalMaxRequests() { + if maxRequests(cn.actualRequestState.Requests.GetCardinality()) >= cn.nominalMaxRequests() { return true, errors.New("too many outstanding requests") } - if cn.actualRequestState.Requests == nil { - cn.actualRequestState.Requests = make(map[Request]struct{}) - } - cn.actualRequestState.Requests[r] = struct{}{} + cn.actualRequestState.Requests.Add(r) if cn.validReceiveChunks == nil { - cn.validReceiveChunks = make(map[Request]int) + cn.validReceiveChunks = make(map[RequestIndex]int) } cn.validReceiveChunks[r]++ cn.t.pendingRequests[r]++ cn.updateExpectingChunks() + ppReq := cn.t.requestIndexToRequest(r) for _, f := range cn.callbacks.SentRequest { - f(PeerRequestEvent{cn, r}) + f(PeerRequestEvent{cn, ppReq}) } - return cn.peerImpl._request(r), nil + return cn.peerImpl._request(ppReq), nil } func (me *PeerConn) _request(r Request) bool { @@ -604,9 +606,9 @@ func (me *PeerConn) _request(r Request) bool { }) } -func (me *Peer) cancel(r Request) bool { +func (me *Peer) cancel(r RequestIndex) bool { if me.deleteRequest(r) { - return me.peerImpl._cancel(r) + return me.peerImpl._cancel(me.t.requestIndexToRequest(r)) } return true } @@ -653,7 +655,7 @@ func (cn *PeerConn) postBitfield() { } func (cn *PeerConn) updateRequests() { - if len(cn.actualRequestState.Requests) != 0 { + if cn.actualRequestState.Requests.GetCardinality() != 0 { return } cn.tickleWriter() @@ -1128,7 +1130,7 @@ func (c *PeerConn) mainReadLoop() (err error) { case pp.HaveNone: err = c.peerSentHaveNone() case pp.Reject: - c.remoteRejectedRequest(newRequestFromMessage(&msg)) + c.remoteRejectedRequest(c.t.requestIndexFromRequest(newRequestFromMessage(&msg))) case pp.AllowedFast: torrent.Add("allowed fasts received", 1) log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c).SetLevel(log.Debug).Log(c.t.logger) @@ -1145,13 +1147,13 @@ func (c *PeerConn) mainReadLoop() (err error) { } } -func (c *Peer) remoteRejectedRequest(r Request) { +func (c *Peer) remoteRejectedRequest(r RequestIndex) { if c.deleteRequest(r) { c.decExpectedChunkReceive(r) } } -func (c *Peer) decExpectedChunkReceive(r Request) { +func (c *Peer) decExpectedChunkReceive(r RequestIndex) { count := c.validReceiveChunks[r] if count == 1 { delete(c.validReceiveChunks, r) @@ -1250,7 +1252,8 @@ func (c *Peer) doChunkReadStats(size int64) { func (c *Peer) receiveChunk(msg *pp.Message) error { chunksReceived.Add("total", 1) - req := newRequestFromMessage(msg) + ppReq := newRequestFromMessage(msg) + req := c.t.requestIndexFromRequest(ppReq) if c.peerChoking { chunksReceived.Add("while choked", 1) @@ -1262,7 +1265,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { } c.decExpectedChunkReceive(req) - if c.peerChoking && c.peerAllowedFast.Get(bitmap.BitIndex(req.Index)) { + if c.peerChoking && c.peerAllowedFast.Get(bitmap.BitIndex(ppReq.Index)) { chunksReceived.Add("due to allowed fast", 1) } @@ -1271,7 +1274,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { // out. deletedRequest := false { - if _, ok := c.actualRequestState.Requests[req]; ok { + if c.actualRequestState.Requests.Contains(req) { for _, f := range c.callbacks.ReceivedRequested { f(PeerMessageEvent{c, msg}) } @@ -1291,13 +1294,13 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { cl := t.cl // Do we actually want this chunk? - if t.haveChunk(req) { + if t.haveChunk(ppReq) { chunksReceived.Add("wasted", 1) c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadWasted })) return nil } - piece := &t.pieces[req.Index] + piece := &t.pieces[ppReq.Index] c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful })) c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData })) @@ -1316,7 +1319,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { piece.incrementPendingWrites() // Record that we have the chunk, so we aren't trying to download it while // waiting for it to be written to storage. - piece.unpendChunkIndex(chunkIndex(req.ChunkSpec, t.chunkSize)) + piece.unpendChunkIndex(chunkIndexFromChunkSpec(ppReq.ChunkSpec, t.chunkSize)) // Cancel pending requests for this chunk from *other* peers. t.iterPeers(func(p *Peer) { @@ -1349,11 +1352,11 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { return nil } - c.onDirtiedPiece(pieceIndex(req.Index)) + c.onDirtiedPiece(pieceIndex(ppReq.Index)) // We need to ensure the piece is only queued once, so only the last chunk writer gets this job. - if t.pieceAllDirty(pieceIndex(req.Index)) && piece.pendingWrites == 0 { - t.queuePieceCheck(pieceIndex(req.Index)) + if t.pieceAllDirty(pieceIndex(ppReq.Index)) && piece.pendingWrites == 0 { + t.queuePieceCheck(pieceIndex(ppReq.Index)) // We don't pend all chunks here anymore because we don't want code dependent on the dirty // chunk status (such as the haveChunk call above) to have to check all the various other // piece states like queued for hash, hashing etc. This does mean that we need to be sure @@ -1362,7 +1365,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { cl.event.Broadcast() // We do this because we've written a chunk, and may change PieceState.Partial. - t.publishPieceChange(pieceIndex(req.Index)) + t.publishPieceChange(pieceIndex(ppReq.Index)) return nil } @@ -1456,14 +1459,13 @@ func (c *Peer) peerHasWantedPieces() bool { return !c._pieceRequestOrder.IsEmpty() } -func (c *Peer) deleteRequest(r Request) bool { - delete(c.nextRequestState.Requests, r) - if _, ok := c.actualRequestState.Requests[r]; !ok { +func (c *Peer) deleteRequest(r RequestIndex) bool { + c.nextRequestState.Requests.Remove(r) + if !c.actualRequestState.Requests.CheckedRemove(r) { return false } - delete(c.actualRequestState.Requests, r) for _, f := range c.callbacks.DeletedRequest { - f(PeerRequestEvent{c, r}) + f(PeerRequestEvent{c, c.t.requestIndexToRequest(r)}) } c.updateExpectingChunks() pr := c.t.pendingRequests @@ -1479,13 +1481,14 @@ func (c *Peer) deleteRequest(r Request) bool { } func (c *Peer) deleteAllRequests() { - for r := range c.actualRequestState.Requests { - c.deleteRequest(r) - } - if l := len(c.actualRequestState.Requests); l != 0 { - panic(l) + c.actualRequestState.Requests.Clone().Iterate(func(x uint32) bool { + c.deleteRequest(x) + return true + }) + if !c.actualRequestState.Requests.IsEmpty() { + panic(c.actualRequestState.Requests.GetCardinality()) } - c.nextRequestState.Requests = nil + c.nextRequestState.Requests.Clear() // for c := range c.t.conns { // c.tickleWriter() // } diff --git a/peerconn_test.go b/peerconn_test.go index 2f337958..395dbb62 100644 --- a/peerconn_test.go +++ b/peerconn_test.go @@ -23,7 +23,7 @@ func TestSendBitfieldThenHave(t *testing.T) { cl.initLogger() c := cl.newConnection(nil, false, nil, "io.Pipe", "") c.setTorrent(cl.newTorrent(metainfo.Hash{}, nil)) - if err := c.t.setInfo(&metainfo.Info{ Pieces: make([]byte, metainfo.HashSize*3) }); err != nil { + if err := c.t.setInfo(&metainfo.Info{Pieces: make([]byte, metainfo.HashSize*3)}); err != nil { t.Log(err) } r, w := io.Pipe() @@ -129,7 +129,9 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) { // The chunk must be written to storage everytime, to ensure the // writeSem is unlocked. t.pieces[0]._dirtyChunks.Clear() - cn.validReceiveChunks = map[Request]int{newRequestFromMessage(&msg): 1} + cn.validReceiveChunks = map[RequestIndex]int{ + t.requestIndexFromRequest(newRequestFromMessage(&msg)): 1, + } cl.unlock() n, err := w.Write(wb) require.NoError(b, err) diff --git a/piece.go b/piece.go index c7944697..30ac2297 100644 --- a/piece.go +++ b/piece.go @@ -55,12 +55,12 @@ func (p *Piece) Storage() storage.Piece { return p.t.storage.Piece(p.Info()) } -func (p *Piece) pendingChunkIndex(chunkIndex int) bool { +func (p *Piece) pendingChunkIndex(chunkIndex chunkIndexType) bool { return !p._dirtyChunks.Contains(bitmap.BitIndex(chunkIndex)) } func (p *Piece) pendingChunk(cs ChunkSpec, chunkSize pp.Integer) bool { - return p.pendingChunkIndex(chunkIndex(cs, chunkSize)) + return p.pendingChunkIndex(chunkIndexFromChunkSpec(cs, chunkSize)) } func (p *Piece) hasDirtyChunks() bool { @@ -71,17 +71,17 @@ func (p *Piece) numDirtyChunks() pp.Integer { return pp.Integer(p._dirtyChunks.Len()) } -func (p *Piece) unpendChunkIndex(i int) { +func (p *Piece) unpendChunkIndex(i chunkIndexType) { p._dirtyChunks.Add(bitmap.BitIndex(i)) p.readerCond.Broadcast() } -func (p *Piece) pendChunkIndex(i int) { +func (p *Piece) pendChunkIndex(i RequestIndex) { p._dirtyChunks.Remove(bitmap.BitIndex(i)) } func (p *Piece) numChunks() pp.Integer { - return p.t.pieceNumChunks(p.index) + return pp.Integer(p.t.pieceNumChunks(p.index)) } func (p *Piece) incrementPendingWrites() { @@ -237,11 +237,15 @@ func (p *Piece) State() PieceState { return p.t.PieceState(p.index) } -func (p *Piece) iterUndirtiedChunks(f func(cs ChunkSpec)) { - for i := pp.Integer(0); i < p.numChunks(); i++ { - if p.chunkIndexDirty(i) { +func (p *Piece) iterUndirtiedChunks(f func(cs chunkIndexType)) { + for i := chunkIndexType(0); i < chunkIndexType(p.numChunks()); i++ { + if p.chunkIndexDirty(pp.Integer(i)) { continue } - f(p.chunkIndexSpec(i)) + f(i) } } + +func (p *Piece) requestIndexOffset() RequestIndex { + return RequestIndex(p.index) * p.t.chunksPerRegularPiece() +} diff --git a/request-strategy/order.go b/request-strategy/order.go index 473b02b3..752198a6 100644 --- a/request-strategy/order.go +++ b/request-strategy/order.go @@ -10,11 +10,12 @@ import ( "github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/storage" - pp "github.com/anacrolix/torrent/peer_protocol" "github.com/anacrolix/torrent/types" ) type ( + RequestIndex = uint32 + ChunkIndex = uint32 Request = types.Request pieceIndex = types.PieceIndex piecePriority = types.PiecePriority @@ -59,15 +60,13 @@ type requestsPeer struct { } func (rp *requestsPeer) canFitRequest() bool { - return len(rp.nextState.Requests) < rp.MaxRequests + return int(rp.nextState.Requests.GetCardinality()) < rp.MaxRequests } -func (rp *requestsPeer) addNextRequest(r Request) { - _, ok := rp.nextState.Requests[r] - if ok { +func (rp *requestsPeer) addNextRequest(r RequestIndex) { + if !rp.nextState.Requests.CheckedAdd(r) { panic("should only add once") } - rp.nextState.Requests[r] = struct{}{} } type peersForPieceRequests struct { @@ -75,7 +74,7 @@ type peersForPieceRequests struct { *requestsPeer } -func (me *peersForPieceRequests) addNextRequest(r Request) { +func (me *peersForPieceRequests) addNextRequest(r RequestIndex) { me.requestsPeer.addNextRequest(r) me.requestsInPiece++ } @@ -88,6 +87,10 @@ type requestablePiece struct { IterPendingChunks ChunksIter } +func (p *requestablePiece) chunkIndexToRequestIndex(c ChunkIndex) RequestIndex { + return RequestIndex(p.t.ChunksPerPiece*p.index) + RequestIndex(c) +} + type filterPiece struct { t *filterTorrent index pieceIndex @@ -181,9 +184,6 @@ func Run(input Input) map[PeerId]PeerNextRequestState { for _, p := range t.Peers { peers = append(peers, &requestsPeer{ Peer: p, - nextState: PeerNextRequestState{ - Requests: make(map[Request]struct{}, p.MaxRequests), - }, }) } allPeers[t.InfoHash] = peers @@ -239,7 +239,7 @@ func makePeersForPiece(cap int) []*peersForPieceRequests { type peersForPieceSorter struct { peersForPiece []*peersForPieceRequests - req *Request + req *RequestIndex p requestablePiece } @@ -259,8 +259,8 @@ func (me *peersForPieceSorter) Less(_i, _j int) bool { byHasRequest := func() multiless.Computation { ml := multiless.New() if req != nil { - _, iHas := i.nextState.Requests[*req] - _, jHas := j.nextState.Requests[*req] + iHas := i.nextState.Requests.Contains(*req) + jHas := j.nextState.Requests.Contains(*req) ml = ml.Bool(jHas, iHas) } return ml @@ -327,16 +327,16 @@ func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) { peersForPiece: peersForPiece, p: p, } - sortPeersForPiece := func(req *Request) { + sortPeersForPiece := func(req *RequestIndex) { peersForPieceSorter.req = req sort.Sort(&peersForPieceSorter) //ensureValidSortedPeersForPieceRequests(&peersForPieceSorter) } // Chunks can be preassigned several times, if peers haven't been able to update their "actual" // with "next" request state before another request strategy run occurs. - preallocated := make(map[ChunkSpec][]*peersForPieceRequests, p.NumPendingChunks) - p.IterPendingChunks(func(spec ChunkSpec) { - req := Request{pp.Integer(p.index), spec} + preallocated := make([][]*peersForPieceRequests, p.t.ChunksPerPiece) + p.IterPendingChunks(func(spec ChunkIndex) { + req := p.chunkIndexToRequestIndex(spec) for _, peer := range peersForPiece { if h := peer.HasExistingRequest; h == nil || !h(req) { continue @@ -349,11 +349,11 @@ func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) { } }) pendingChunksRemaining := int(p.NumPendingChunks) - p.IterPendingChunks(func(chunk types.ChunkSpec) { - if _, ok := preallocated[chunk]; ok { + p.IterPendingChunks(func(chunk ChunkIndex) { + if len(preallocated[chunk]) != 0 { return } - req := Request{pp.Integer(p.index), chunk} + req := p.chunkIndexToRequestIndex(chunk) defer func() { pendingChunksRemaining-- }() sortPeersForPiece(nil) for _, peer := range peersForPiece { @@ -373,14 +373,17 @@ func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) { }) chunk: for chunk, prePeers := range preallocated { + if len(prePeers) == 0 { + continue + } pendingChunksRemaining-- - req := Request{pp.Integer(p.index), chunk} + req := p.chunkIndexToRequestIndex(ChunkIndex(chunk)) for _, pp := range prePeers { pp.requestsInPiece-- } sortPeersForPiece(&req) for _, pp := range prePeers { - delete(pp.nextState.Requests, req) + pp.nextState.Requests.Remove(req) } for _, peer := range peersForPiece { if !peer.canFitRequest() { diff --git a/request-strategy/order_test.go b/request-strategy/order_test.go index 89a85085..d15988f7 100644 --- a/request-strategy/order_test.go +++ b/request-strategy/order_test.go @@ -4,36 +4,29 @@ import ( "math" "testing" + "github.com/RoaringBitmap/roaring" qt "github.com/frankban/quicktest" - - pp "github.com/anacrolix/torrent/peer_protocol" + "github.com/google/go-cmp/cmp" ) -func r(i pieceIndex, begin int) Request { - return Request{pp.Integer(i), ChunkSpec{pp.Integer(begin), 1}} -} - -func chunkIterRange(end int) func(func(ChunkSpec)) { - return func(f func(ChunkSpec)) { - for offset := 0; offset < end; offset += 1 { - f(ChunkSpec{pp.Integer(offset), 1}) +func chunkIterRange(end ChunkIndex) ChunksIter { + return func(f func(ChunkIndex)) { + for offset := ChunkIndex(0); offset < end; offset += 1 { + f(offset) } } } -func chunkIter(offsets ...int) func(func(ChunkSpec)) { - return func(f func(ChunkSpec)) { +func chunkIter(offsets ...ChunkIndex) ChunksIter { + return func(f func(ChunkIndex)) { for _, offset := range offsets { - f(ChunkSpec{pp.Integer(offset), 1}) + f(offset) } } } -func requestSetFromSlice(rs ...Request) (ret map[Request]struct{}) { - ret = make(map[Request]struct{}, len(rs)) - for _, r := range rs { - ret[r] = struct{}{} - } +func requestSetFromSlice(rs ...RequestIndex) (ret roaring.Bitmap) { + ret.AddMany(rs) return } @@ -43,6 +36,8 @@ func (i intPeerId) Uintptr() uintptr { return uintptr(i) } +func hasAllRequests(RequestIndex) bool { return true } + func TestStealingFromSlowerPeer(t *testing.T) { c := qt.New(t) basePeer := Peer{ @@ -55,15 +50,14 @@ func TestStealingFromSlowerPeer(t *testing.T) { // Slower than the stealers, but has all requests already. stealee := basePeer stealee.DownloadRate = 1 - stealee.HasExistingRequest = func(r Request) bool { - return true - } + stealee.HasExistingRequest = hasAllRequests stealee.Id = intPeerId(1) firstStealer := basePeer firstStealer.Id = intPeerId(2) secondStealer := basePeer secondStealer.Id = intPeerId(3) results := Run(Input{Torrents: []Torrent{{ + ChunksPerPiece: 9, Pieces: []Piece{{ Request: true, NumPendingChunks: 5, @@ -77,8 +71,9 @@ func TestStealingFromSlowerPeer(t *testing.T) { }}}) c.Assert(results, qt.HasLen, 3) - check := func(p PeerId, l int) { - c.Check(results[p].Requests, qt.HasLen, l) + check := func(p PeerId, l uint64) { + addressableBm := results[p].Requests + c.Check(addressableBm.GetCardinality(), qt.ContentEquals, l) c.Check(results[p].Interested, qt.Equals, l > 0) } check(stealee.Id, 1) @@ -86,8 +81,9 @@ func TestStealingFromSlowerPeer(t *testing.T) { check(secondStealer.Id, 2) } -func checkNumRequestsAndInterest(c *qt.C, next PeerNextRequestState, num int, interest bool) { - c.Check(next.Requests, qt.HasLen, num) +func checkNumRequestsAndInterest(c *qt.C, next PeerNextRequestState, num uint64, interest bool) { + addressableBm := next.Requests + c.Check(addressableBm.GetCardinality(), qt.ContentEquals, num) c.Check(next.Interested, qt.Equals, interest) } @@ -102,15 +98,14 @@ func TestStealingFromSlowerPeersBasic(t *testing.T) { } stealee := basePeer stealee.DownloadRate = 1 - stealee.HasExistingRequest = func(r Request) bool { - return true - } + stealee.HasExistingRequest = hasAllRequests stealee.Id = intPeerId(1) firstStealer := basePeer firstStealer.Id = intPeerId(2) secondStealer := basePeer secondStealer.Id = intPeerId(3) results := Run(Input{Torrents: []Torrent{{ + ChunksPerPiece: 9, Pieces: []Piece{{ Request: true, NumPendingChunks: 2, @@ -128,6 +123,10 @@ func TestStealingFromSlowerPeersBasic(t *testing.T) { checkNumRequestsAndInterest(c, results[stealee.Id], 0, false) } +func checkResultsRequestsLen(t *testing.T, reqs roaring.Bitmap, l uint64) { + qt.Check(t, reqs.GetCardinality(), qt.Equals, l) +} + func TestPeerKeepsExistingIfReasonable(t *testing.T) { c := qt.New(t) basePeer := Peer{ @@ -140,8 +139,8 @@ func TestPeerKeepsExistingIfReasonable(t *testing.T) { // Slower than the stealers, but has all requests already. stealee := basePeer stealee.DownloadRate = 1 - keepReq := r(0, 0) - stealee.HasExistingRequest = func(r Request) bool { + keepReq := RequestIndex(0) + stealee.HasExistingRequest = func(r RequestIndex) bool { return r == keepReq } stealee.Id = intPeerId(1) @@ -150,6 +149,7 @@ func TestPeerKeepsExistingIfReasonable(t *testing.T) { secondStealer := basePeer secondStealer.Id = intPeerId(3) results := Run(Input{Torrents: []Torrent{{ + ChunksPerPiece: 9, Pieces: []Piece{{ Request: true, NumPendingChunks: 4, @@ -163,18 +163,29 @@ func TestPeerKeepsExistingIfReasonable(t *testing.T) { }}}) c.Assert(results, qt.HasLen, 3) - check := func(p PeerId, l int) { - c.Check(results[p].Requests, qt.HasLen, l) + check := func(p PeerId, l uint64) { + checkResultsRequestsLen(t, results[p].Requests, l) c.Check(results[p].Interested, qt.Equals, l > 0) } check(firstStealer.Id, 2) check(secondStealer.Id, 1) - c.Check(results[stealee.Id], qt.ContentEquals, PeerNextRequestState{ - Interested: true, - Requests: requestSetFromSlice(keepReq), - }) + c.Check( + results[stealee.Id], + peerNextRequestStateChecker, + PeerNextRequestState{ + Interested: true, + Requests: requestSetFromSlice(keepReq), + }, + ) } +var peerNextRequestStateChecker = qt.CmpEquals( + cmp.Transformer( + "bitmap", + func(bm roaring.Bitmap) []uint32 { + return bm.ToArray() + })) + func TestDontStealUnnecessarily(t *testing.T) { c := qt.New(t) basePeer := Peer{ @@ -187,12 +198,14 @@ func TestDontStealUnnecessarily(t *testing.T) { // Slower than the stealers, but has all requests already. stealee := basePeer stealee.DownloadRate = 1 + r := func(i, c RequestIndex) RequestIndex { + return i*9 + c + } keepReqs := requestSetFromSlice( r(3, 2), r(3, 4), r(3, 6), r(3, 8), r(4, 0), r(4, 1), r(4, 7), r(4, 8)) - stealee.HasExistingRequest = func(r Request) bool { - _, ok := keepReqs[r] - return ok + stealee.HasExistingRequest = func(r RequestIndex) bool { + return keepReqs.Contains(r) } stealee.Id = intPeerId(1) firstStealer := basePeer @@ -208,6 +221,7 @@ func TestDontStealUnnecessarily(t *testing.T) { } } results := Run(Input{Torrents: []Torrent{{ + ChunksPerPiece: 9, Pieces: []Piece{ { Request: true, @@ -242,16 +256,20 @@ func TestDontStealUnnecessarily(t *testing.T) { }}}) c.Assert(results, qt.HasLen, 3) - check := func(p PeerId, l int) { - c.Check(results[p].Requests, qt.HasLen, l) + check := func(p PeerId, l uint64) { + checkResultsRequestsLen(t, results[p].Requests, l) c.Check(results[p].Interested, qt.Equals, l > 0) } check(firstStealer.Id, 5) check(secondStealer.Id, 7+9) - c.Check(results[stealee.Id], qt.ContentEquals, PeerNextRequestState{ - Interested: true, - Requests: requestSetFromSlice(r(4, 0), r(4, 1), r(4, 7), r(4, 8)), - }) + c.Check( + results[stealee.Id], + peerNextRequestStateChecker, + PeerNextRequestState{ + Interested: true, + Requests: requestSetFromSlice(r(4, 0), r(4, 1), r(4, 7), r(4, 8)), + }, + ) } // This tests a situation where multiple peers had the same existing request, due to "actual" and @@ -260,10 +278,8 @@ func TestDontStealUnnecessarily(t *testing.T) { func TestDuplicatePreallocations(t *testing.T) { peer := func(id int, downloadRate float64) Peer { return Peer{ - HasExistingRequest: func(r Request) bool { - return true - }, - MaxRequests: 2, + HasExistingRequest: hasAllRequests, + MaxRequests: 2, HasPiece: func(i pieceIndex) bool { return true }, @@ -273,6 +289,7 @@ func TestDuplicatePreallocations(t *testing.T) { } results := Run(Input{ Torrents: []Torrent{{ + ChunksPerPiece: 1, Pieces: []Piece{{ Request: true, NumPendingChunks: 1, @@ -292,5 +309,7 @@ func TestDuplicatePreallocations(t *testing.T) { }}, }) c := qt.New(t) - c.Assert(2, qt.Equals, len(results[intPeerId(1)].Requests)+len(results[intPeerId(2)].Requests)) + req1 := results[intPeerId(1)].Requests + req2 := results[intPeerId(2)].Requests + c.Assert(uint64(2), qt.Equals, req1.GetCardinality()+req2.GetCardinality()) } diff --git a/request-strategy/peer.go b/request-strategy/peer.go index b6222064..ece8ea42 100644 --- a/request-strategy/peer.go +++ b/request-strategy/peer.go @@ -2,11 +2,13 @@ package request_strategy import ( "time" + + "github.com/RoaringBitmap/roaring" ) type PeerNextRequestState struct { Interested bool - Requests map[Request]struct{} + Requests roaring.Bitmap } type PeerId interface { @@ -16,7 +18,7 @@ type PeerId interface { type Peer struct { HasPiece func(i pieceIndex) bool MaxRequests int - HasExistingRequest func(r Request) bool + HasExistingRequest func(r RequestIndex) bool Choking bool PieceAllowedFast func(pieceIndex) bool DownloadRate float64 diff --git a/request-strategy/piece.go b/request-strategy/piece.go index bc59c052..2dbe0bcf 100644 --- a/request-strategy/piece.go +++ b/request-strategy/piece.go @@ -1,10 +1,6 @@ package request_strategy -import ( - "github.com/anacrolix/torrent/types" -) - -type ChunksIter func(func(types.ChunkSpec)) +type ChunksIter func(func(ChunkIndex)) type Piece struct { Request bool @@ -16,7 +12,7 @@ type Piece struct { IterPendingChunks ChunksIter } -func (p Piece) iterPendingChunksWrapper(f func(ChunkSpec)) { +func (p Piece) iterPendingChunksWrapper(f func(ChunkIndex)) { i := p.IterPendingChunks if i != nil { i(f) diff --git a/request-strategy/torrent.go b/request-strategy/torrent.go index a92bffc0..262ae965 100644 --- a/request-strategy/torrent.go +++ b/request-strategy/torrent.go @@ -11,7 +11,8 @@ type Torrent struct { // Unclosed Peers. Not necessary for getting requestable piece ordering. Peers []Peer // Some value that's unique and stable between runs. Could even use the infohash? - InfoHash metainfo.Hash + InfoHash metainfo.Hash + ChunksPerPiece int MaxUnverifiedBytes int64 } diff --git a/requesting.go b/requesting.go index 29a6dd96..9f896738 100644 --- a/requesting.go +++ b/requesting.go @@ -5,7 +5,6 @@ import ( "unsafe" "github.com/anacrolix/missinggo/v2/bitmap" - pp "github.com/anacrolix/torrent/peer_protocol" "github.com/anacrolix/chansync" request_strategy "github.com/anacrolix/torrent/request-strategy" @@ -43,8 +42,13 @@ func (cl *Client) tickleRequester() { func (cl *Client) getRequestStrategyInput() request_strategy.Input { ts := make([]request_strategy.Torrent, 0, len(cl.torrents)) for _, t := range cl.torrents { + if !t.haveInfo() { + // This would be removed if metadata is handled here. + continue + } rst := request_strategy.Torrent{ - InfoHash: t.infoHash, + InfoHash: t.infoHash, + ChunksPerPiece: (t.usualPieceSize() + int(t.chunkSize) - 1) / int(t.chunkSize), } if t.storage != nil { rst.Capacity = t.storage.Capacity @@ -73,9 +77,8 @@ func (cl *Client) getRequestStrategyInput() request_strategy.Input { rst.Peers = append(rst.Peers, request_strategy.Peer{ HasPiece: p.peerHasPiece, MaxRequests: p.nominalMaxRequests(), - HasExistingRequest: func(r request_strategy.Request) bool { - _, ok := p.actualRequestState.Requests[r] - return ok + HasExistingRequest: func(r RequestIndex) bool { + return p.actualRequestState.Requests.Contains(r) }, Choking: p.peerChoking, PieceAllowedFast: func(i pieceIndex) bool { @@ -119,8 +122,11 @@ func setPeerNextRequestState(_p request_strategy.PeerId, rp request_strategy.Pee p.onNextRequestStateChanged() } +type RequestIndex = request_strategy.RequestIndex +type chunkIndexType = request_strategy.ChunkIndex + func (p *Peer) applyNextRequestState() bool { - if len(p.actualRequestState.Requests) > p.nominalMaxRequests()/2 { + if p.actualRequestState.Requests.GetCardinality() > uint64(p.nominalMaxRequests()/2) { return true } type piece struct { @@ -148,8 +154,8 @@ func (p *Peer) applyNextRequestState() bool { for _, endGameIter := range []bool{false, true} { for _, piece := range pieceOrder { tp := p.t.piece(piece.index) - tp.iterUndirtiedChunks(func(cs ChunkSpec) { - req := Request{pp.Integer(piece.index), cs} + tp.iterUndirtiedChunks(func(cs chunkIndexType) { + req := cs + tp.requestIndexOffset() if !piece.endGame && !endGameIter && p.t.pendingRequests[req] > 0 { return } @@ -158,10 +164,10 @@ func (p *Peer) applyNextRequestState() bool { if !more { return } - if len(p.actualRequestState.Requests) >= p.nominalMaxRequests() { + if maxRequests(p.actualRequestState.Requests.GetCardinality()) >= p.nominalMaxRequests() { return } - if p.peerChoking && !p.peerAllowedFast.Contains(bitmap.BitIndex(req.Index)) { + if p.peerChoking && !p.peerAllowedFast.Contains(bitmap.BitIndex(piece.index)) { return } var err error @@ -170,7 +176,7 @@ func (p *Peer) applyNextRequestState() bool { panic(err) } }) - if interested && len(p.actualRequestState.Requests) >= p.nominalMaxRequests() { + if interested && maxRequests(p.actualRequestState.Requests.GetCardinality()) >= p.nominalMaxRequests() { break } if !more { diff --git a/torrent.go b/torrent.go index a584874b..bf7968cf 100644 --- a/torrent.go +++ b/torrent.go @@ -143,7 +143,7 @@ type Torrent struct { connPieceInclinationPool sync.Pool // Count of each request across active connections. - pendingRequests map[Request]int + pendingRequests map[RequestIndex]int pex pexState } @@ -440,7 +440,7 @@ func (t *Torrent) onSetInfo() { t.cl.event.Broadcast() close(t.gotMetainfoC) t.updateWantPeersEvent() - t.pendingRequests = make(map[Request]int) + t.pendingRequests = make(map[RequestIndex]int) t.tryCreateMorePieceHashers() } @@ -842,8 +842,12 @@ func (t *Torrent) bitfield() (bf []bool) { return } -func (t *Torrent) pieceNumChunks(piece pieceIndex) pp.Integer { - return (t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize +func (t *Torrent) chunksPerRegularPiece() uint32 { + return uint32((pp.Integer(t.usualPieceSize()) + t.chunkSize - 1) / t.chunkSize) +} + +func (t *Torrent) pieceNumChunks(piece pieceIndex) chunkIndexType { + return chunkIndexType((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize) } func (t *Torrent) pendAllChunkSpecs(pieceIndex pieceIndex) { @@ -940,8 +944,8 @@ func (t *Torrent) haveChunk(r Request) (ret bool) { return !p.pendingChunk(r.ChunkSpec, t.chunkSize) } -func chunkIndex(cs ChunkSpec, chunkSize pp.Integer) int { - return int(cs.Begin / chunkSize) +func chunkIndexFromChunkSpec(cs ChunkSpec, chunkSize pp.Integer) chunkIndexType { + return chunkIndexType(cs.Begin / chunkSize) } func (t *Torrent) wantPieceIndex(index pieceIndex) bool { @@ -1033,7 +1037,7 @@ func (t *Torrent) pieceNumPendingChunks(piece pieceIndex) pp.Integer { if t.pieceComplete(piece) { return 0 } - return t.pieceNumChunks(piece) - t.pieces[piece].numDirtyChunks() + return pp.Integer(t.pieceNumChunks(piece)) - t.pieces[piece].numDirtyChunks() } func (t *Torrent) pieceAllDirty(piece pieceIndex) bool { @@ -1170,9 +1174,8 @@ func (t *Torrent) piecePriority(piece pieceIndex) piecePriority { return ret } -func (t *Torrent) pendRequest(req Request) { - ci := chunkIndex(req.ChunkSpec, t.chunkSize) - t.pieces[req.Index].pendChunkIndex(ci) +func (t *Torrent) pendRequest(req RequestIndex) { + t.piece(int(req / t.chunksPerRegularPiece())).pendChunkIndex(req % t.chunksPerRegularPiece()) } func (t *Torrent) pieceCompletionChanged(piece pieceIndex) { @@ -2259,3 +2262,19 @@ func (t *Torrent) peerIsActive(p *Peer) (active bool) { }) return } + +func (t *Torrent) requestIndexToRequest(ri RequestIndex) Request { + index := ri / t.chunksPerRegularPiece() + return Request{ + pp.Integer(index), + t.piece(int(index)).chunkIndexSpec(pp.Integer(ri % t.chunksPerRegularPiece())), + } +} + +func (t *Torrent) requestIndexFromRequest(r Request) RequestIndex { + return t.chunksPerRegularPiece()*uint32(r.Index) + uint32(r.Begin/t.chunkSize) +} + +func (t *Torrent) numChunks() RequestIndex { + return RequestIndex((t.Length() + int64(t.chunkSize) - 1) / int64(t.chunkSize)) +} diff --git a/webseed-peer.go b/webseed-peer.go index f2ccbff5..9263aa46 100644 --- a/webseed-peer.go +++ b/webseed-peer.go @@ -78,11 +78,17 @@ func (ws *webseedPeer) requester() { defer ws.requesterCond.L.Unlock() start: for !ws.peer.closed.IsSet() { - for r := range ws.peer.actualRequestState.Requests { + restart := false + ws.peer.actualRequestState.Requests.Iterate(func(x uint32) bool { + r := ws.peer.t.requestIndexToRequest(x) if _, ok := ws.activeRequests[r]; ok { - continue + return true } ws.doRequest(r) + restart = true + return false + }) + if restart { goto start } ws.requesterCond.Wait() @@ -134,7 +140,7 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re }() { ws.peer.close() } else { - ws.peer.remoteRejectedRequest(r) + ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r)) } } else { err := ws.peer.receiveChunk(&pp.Message{ -- 2.48.1