// Chunks that we might reasonably expect to receive from the peer. Due to
// latency, buffering, and implementation differences, we may receive
// chunks that are no longer in the set of requests actually want.
- validReceiveChunks map[Request]int
+ validReceiveChunks map[RequestIndex]int
// Indexed by metadata piece, set to true if posted and pending a
// response.
metadataRequests []bool
}
func (cn *Peer) expectingChunks() bool {
- if len(cn.actualRequestState.Requests) == 0 {
+ if cn.actualRequestState.Requests.IsEmpty() {
return false
}
if !cn.actualRequestState.Interested {
return false
}
- for r := range cn.actualRequestState.Requests {
- if !cn.remoteChokingPiece(r.Index.Int()) {
- return true
- }
+ if cn.peerAllowedFast.IterTyped(func(_i int) bool {
+ i := RequestIndex(_i)
+ return cn.actualRequestState.Requests.Rank((i+1)*cn.t.chunksPerRegularPiece())-
+ cn.actualRequestState.Requests.Rank(i*cn.t.chunksPerRegularPiece()) == 0
+ }) {
+ return true
}
- return false
+ return !cn.peerChoking
}
func (cn *Peer) remoteChokingPiece(piece pieceIndex) bool {
func (cn *Peer) numRequestsByPiece() (ret map[pieceIndex]int) {
ret = make(map[pieceIndex]int)
- for r := range cn.actualRequestState.Requests {
- ret[pieceIndex(r.Index)]++
- }
+ cn.actualRequestState.Requests.Iterate(func(x uint32) bool {
+ ret[pieceIndex(x/cn.t.chunksPerRegularPiece())]++
+ return true
+ })
return
}
&cn._stats.ChunksReadUseful,
&cn._stats.ChunksRead,
&cn._stats.ChunksWritten,
- len(cn.actualRequestState.Requests),
+ cn.actualRequestState.Requests.GetCardinality(),
cn.nominalMaxRequests(),
cn.PeerMaxRequests,
len(cn.peerRequests),
// are okay.
type messageWriter func(pp.Message) bool
-func (cn *Peer) shouldRequest(r Request) error {
- if !cn.peerHasPiece(pieceIndex(r.Index)) {
+func (cn *Peer) shouldRequest(r RequestIndex) error {
+ pi := pieceIndex(r / cn.t.chunksPerRegularPiece())
+ if !cn.peerHasPiece(pi) {
return errors.New("requesting piece peer doesn't have")
}
if !cn.t.peerIsActive(cn) {
if cn.closed.IsSet() {
panic("requesting when connection is closed")
}
- if cn.t.hashingPiece(pieceIndex(r.Index)) {
+ if cn.t.hashingPiece(pi) {
panic("piece is being hashed")
}
- if cn.t.pieceQueuedForHash(pieceIndex(r.Index)) {
+ if cn.t.pieceQueuedForHash(pi) {
panic("piece is queued for hash")
}
- if cn.peerChoking && !cn.peerAllowedFast.Contains(bitmap.BitIndex(r.Index)) {
+ if cn.peerChoking && !cn.peerAllowedFast.Contains(bitmap.BitIndex(pi)) {
panic("peer choking and piece not allowed fast")
}
return nil
}
-func (cn *Peer) request(r Request) (more bool, err error) {
+func (cn *Peer) request(r RequestIndex) (more bool, err error) {
if err := cn.shouldRequest(r); err != nil {
panic(err)
}
- if _, ok := cn.actualRequestState.Requests[r]; ok {
+ if cn.actualRequestState.Requests.Contains(r) {
return true, nil
}
- if len(cn.actualRequestState.Requests) >= cn.nominalMaxRequests() {
+ if maxRequests(cn.actualRequestState.Requests.GetCardinality()) >= cn.nominalMaxRequests() {
return true, errors.New("too many outstanding requests")
}
- if cn.actualRequestState.Requests == nil {
- cn.actualRequestState.Requests = make(map[Request]struct{})
- }
- cn.actualRequestState.Requests[r] = struct{}{}
+ cn.actualRequestState.Requests.Add(r)
if cn.validReceiveChunks == nil {
- cn.validReceiveChunks = make(map[Request]int)
+ cn.validReceiveChunks = make(map[RequestIndex]int)
}
cn.validReceiveChunks[r]++
cn.t.pendingRequests[r]++
cn.updateExpectingChunks()
+ ppReq := cn.t.requestIndexToRequest(r)
for _, f := range cn.callbacks.SentRequest {
- f(PeerRequestEvent{cn, r})
+ f(PeerRequestEvent{cn, ppReq})
}
- return cn.peerImpl._request(r), nil
+ return cn.peerImpl._request(ppReq), nil
}
func (me *PeerConn) _request(r Request) bool {
})
}
-func (me *Peer) cancel(r Request) bool {
+func (me *Peer) cancel(r RequestIndex) bool {
if me.deleteRequest(r) {
- return me.peerImpl._cancel(r)
+ return me.peerImpl._cancel(me.t.requestIndexToRequest(r))
}
return true
}
}
func (cn *PeerConn) updateRequests() {
- if len(cn.actualRequestState.Requests) != 0 {
+ if cn.actualRequestState.Requests.GetCardinality() != 0 {
return
}
cn.tickleWriter()
case pp.HaveNone:
err = c.peerSentHaveNone()
case pp.Reject:
- c.remoteRejectedRequest(newRequestFromMessage(&msg))
+ c.remoteRejectedRequest(c.t.requestIndexFromRequest(newRequestFromMessage(&msg)))
case pp.AllowedFast:
torrent.Add("allowed fasts received", 1)
log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c).SetLevel(log.Debug).Log(c.t.logger)
}
}
-func (c *Peer) remoteRejectedRequest(r Request) {
+func (c *Peer) remoteRejectedRequest(r RequestIndex) {
if c.deleteRequest(r) {
c.decExpectedChunkReceive(r)
}
}
-func (c *Peer) decExpectedChunkReceive(r Request) {
+func (c *Peer) decExpectedChunkReceive(r RequestIndex) {
count := c.validReceiveChunks[r]
if count == 1 {
delete(c.validReceiveChunks, r)
func (c *Peer) receiveChunk(msg *pp.Message) error {
chunksReceived.Add("total", 1)
- req := newRequestFromMessage(msg)
+ ppReq := newRequestFromMessage(msg)
+ req := c.t.requestIndexFromRequest(ppReq)
if c.peerChoking {
chunksReceived.Add("while choked", 1)
}
c.decExpectedChunkReceive(req)
- if c.peerChoking && c.peerAllowedFast.Get(bitmap.BitIndex(req.Index)) {
+ if c.peerChoking && c.peerAllowedFast.Get(bitmap.BitIndex(ppReq.Index)) {
chunksReceived.Add("due to allowed fast", 1)
}
// out.
deletedRequest := false
{
- if _, ok := c.actualRequestState.Requests[req]; ok {
+ if c.actualRequestState.Requests.Contains(req) {
for _, f := range c.callbacks.ReceivedRequested {
f(PeerMessageEvent{c, msg})
}
cl := t.cl
// Do we actually want this chunk?
- if t.haveChunk(req) {
+ if t.haveChunk(ppReq) {
chunksReceived.Add("wasted", 1)
c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadWasted }))
return nil
}
- piece := &t.pieces[req.Index]
+ piece := &t.pieces[ppReq.Index]
c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful }))
c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData }))
piece.incrementPendingWrites()
// Record that we have the chunk, so we aren't trying to download it while
// waiting for it to be written to storage.
- piece.unpendChunkIndex(chunkIndex(req.ChunkSpec, t.chunkSize))
+ piece.unpendChunkIndex(chunkIndexFromChunkSpec(ppReq.ChunkSpec, t.chunkSize))
// Cancel pending requests for this chunk from *other* peers.
t.iterPeers(func(p *Peer) {
return nil
}
- c.onDirtiedPiece(pieceIndex(req.Index))
+ c.onDirtiedPiece(pieceIndex(ppReq.Index))
// We need to ensure the piece is only queued once, so only the last chunk writer gets this job.
- if t.pieceAllDirty(pieceIndex(req.Index)) && piece.pendingWrites == 0 {
- t.queuePieceCheck(pieceIndex(req.Index))
+ if t.pieceAllDirty(pieceIndex(ppReq.Index)) && piece.pendingWrites == 0 {
+ t.queuePieceCheck(pieceIndex(ppReq.Index))
// We don't pend all chunks here anymore because we don't want code dependent on the dirty
// chunk status (such as the haveChunk call above) to have to check all the various other
// piece states like queued for hash, hashing etc. This does mean that we need to be sure
cl.event.Broadcast()
// We do this because we've written a chunk, and may change PieceState.Partial.
- t.publishPieceChange(pieceIndex(req.Index))
+ t.publishPieceChange(pieceIndex(ppReq.Index))
return nil
}
return !c._pieceRequestOrder.IsEmpty()
}
-func (c *Peer) deleteRequest(r Request) bool {
- delete(c.nextRequestState.Requests, r)
- if _, ok := c.actualRequestState.Requests[r]; !ok {
+func (c *Peer) deleteRequest(r RequestIndex) bool {
+ c.nextRequestState.Requests.Remove(r)
+ if !c.actualRequestState.Requests.CheckedRemove(r) {
return false
}
- delete(c.actualRequestState.Requests, r)
for _, f := range c.callbacks.DeletedRequest {
- f(PeerRequestEvent{c, r})
+ f(PeerRequestEvent{c, c.t.requestIndexToRequest(r)})
}
c.updateExpectingChunks()
pr := c.t.pendingRequests
}
func (c *Peer) deleteAllRequests() {
- for r := range c.actualRequestState.Requests {
- c.deleteRequest(r)
- }
- if l := len(c.actualRequestState.Requests); l != 0 {
- panic(l)
+ c.actualRequestState.Requests.Clone().Iterate(func(x uint32) bool {
+ c.deleteRequest(x)
+ return true
+ })
+ if !c.actualRequestState.Requests.IsEmpty() {
+ panic(c.actualRequestState.Requests.GetCardinality())
}
- c.nextRequestState.Requests = nil
+ c.nextRequestState.Requests.Clear()
// for c := range c.t.conns {
// c.tickleWriter()
// }
cl.initLogger()
c := cl.newConnection(nil, false, nil, "io.Pipe", "")
c.setTorrent(cl.newTorrent(metainfo.Hash{}, nil))
- if err := c.t.setInfo(&metainfo.Info{ Pieces: make([]byte, metainfo.HashSize*3) }); err != nil {
+ if err := c.t.setInfo(&metainfo.Info{Pieces: make([]byte, metainfo.HashSize*3)}); err != nil {
t.Log(err)
}
r, w := io.Pipe()
// The chunk must be written to storage everytime, to ensure the
// writeSem is unlocked.
t.pieces[0]._dirtyChunks.Clear()
- cn.validReceiveChunks = map[Request]int{newRequestFromMessage(&msg): 1}
+ cn.validReceiveChunks = map[RequestIndex]int{
+ t.requestIndexFromRequest(newRequestFromMessage(&msg)): 1,
+ }
cl.unlock()
n, err := w.Write(wb)
require.NoError(b, err)
return p.t.storage.Piece(p.Info())
}
-func (p *Piece) pendingChunkIndex(chunkIndex int) bool {
+func (p *Piece) pendingChunkIndex(chunkIndex chunkIndexType) bool {
return !p._dirtyChunks.Contains(bitmap.BitIndex(chunkIndex))
}
func (p *Piece) pendingChunk(cs ChunkSpec, chunkSize pp.Integer) bool {
- return p.pendingChunkIndex(chunkIndex(cs, chunkSize))
+ return p.pendingChunkIndex(chunkIndexFromChunkSpec(cs, chunkSize))
}
func (p *Piece) hasDirtyChunks() bool {
return pp.Integer(p._dirtyChunks.Len())
}
-func (p *Piece) unpendChunkIndex(i int) {
+func (p *Piece) unpendChunkIndex(i chunkIndexType) {
p._dirtyChunks.Add(bitmap.BitIndex(i))
p.readerCond.Broadcast()
}
-func (p *Piece) pendChunkIndex(i int) {
+func (p *Piece) pendChunkIndex(i RequestIndex) {
p._dirtyChunks.Remove(bitmap.BitIndex(i))
}
func (p *Piece) numChunks() pp.Integer {
- return p.t.pieceNumChunks(p.index)
+ return pp.Integer(p.t.pieceNumChunks(p.index))
}
func (p *Piece) incrementPendingWrites() {
return p.t.PieceState(p.index)
}
-func (p *Piece) iterUndirtiedChunks(f func(cs ChunkSpec)) {
- for i := pp.Integer(0); i < p.numChunks(); i++ {
- if p.chunkIndexDirty(i) {
+func (p *Piece) iterUndirtiedChunks(f func(cs chunkIndexType)) {
+ for i := chunkIndexType(0); i < chunkIndexType(p.numChunks()); i++ {
+ if p.chunkIndexDirty(pp.Integer(i)) {
continue
}
- f(p.chunkIndexSpec(i))
+ f(i)
}
}
+
+func (p *Piece) requestIndexOffset() RequestIndex {
+ return RequestIndex(p.index) * p.t.chunksPerRegularPiece()
+}
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/storage"
- pp "github.com/anacrolix/torrent/peer_protocol"
"github.com/anacrolix/torrent/types"
)
type (
+ RequestIndex = uint32
+ ChunkIndex = uint32
Request = types.Request
pieceIndex = types.PieceIndex
piecePriority = types.PiecePriority
}
func (rp *requestsPeer) canFitRequest() bool {
- return len(rp.nextState.Requests) < rp.MaxRequests
+ return int(rp.nextState.Requests.GetCardinality()) < rp.MaxRequests
}
-func (rp *requestsPeer) addNextRequest(r Request) {
- _, ok := rp.nextState.Requests[r]
- if ok {
+func (rp *requestsPeer) addNextRequest(r RequestIndex) {
+ if !rp.nextState.Requests.CheckedAdd(r) {
panic("should only add once")
}
- rp.nextState.Requests[r] = struct{}{}
}
type peersForPieceRequests struct {
*requestsPeer
}
-func (me *peersForPieceRequests) addNextRequest(r Request) {
+func (me *peersForPieceRequests) addNextRequest(r RequestIndex) {
me.requestsPeer.addNextRequest(r)
me.requestsInPiece++
}
IterPendingChunks ChunksIter
}
+func (p *requestablePiece) chunkIndexToRequestIndex(c ChunkIndex) RequestIndex {
+ return RequestIndex(p.t.ChunksPerPiece*p.index) + RequestIndex(c)
+}
+
type filterPiece struct {
t *filterTorrent
index pieceIndex
for _, p := range t.Peers {
peers = append(peers, &requestsPeer{
Peer: p,
- nextState: PeerNextRequestState{
- Requests: make(map[Request]struct{}, p.MaxRequests),
- },
})
}
allPeers[t.InfoHash] = peers
type peersForPieceSorter struct {
peersForPiece []*peersForPieceRequests
- req *Request
+ req *RequestIndex
p requestablePiece
}
byHasRequest := func() multiless.Computation {
ml := multiless.New()
if req != nil {
- _, iHas := i.nextState.Requests[*req]
- _, jHas := j.nextState.Requests[*req]
+ iHas := i.nextState.Requests.Contains(*req)
+ jHas := j.nextState.Requests.Contains(*req)
ml = ml.Bool(jHas, iHas)
}
return ml
peersForPiece: peersForPiece,
p: p,
}
- sortPeersForPiece := func(req *Request) {
+ sortPeersForPiece := func(req *RequestIndex) {
peersForPieceSorter.req = req
sort.Sort(&peersForPieceSorter)
//ensureValidSortedPeersForPieceRequests(&peersForPieceSorter)
}
// Chunks can be preassigned several times, if peers haven't been able to update their "actual"
// with "next" request state before another request strategy run occurs.
- preallocated := make(map[ChunkSpec][]*peersForPieceRequests, p.NumPendingChunks)
- p.IterPendingChunks(func(spec ChunkSpec) {
- req := Request{pp.Integer(p.index), spec}
+ preallocated := make([][]*peersForPieceRequests, p.t.ChunksPerPiece)
+ p.IterPendingChunks(func(spec ChunkIndex) {
+ req := p.chunkIndexToRequestIndex(spec)
for _, peer := range peersForPiece {
if h := peer.HasExistingRequest; h == nil || !h(req) {
continue
}
})
pendingChunksRemaining := int(p.NumPendingChunks)
- p.IterPendingChunks(func(chunk types.ChunkSpec) {
- if _, ok := preallocated[chunk]; ok {
+ p.IterPendingChunks(func(chunk ChunkIndex) {
+ if len(preallocated[chunk]) != 0 {
return
}
- req := Request{pp.Integer(p.index), chunk}
+ req := p.chunkIndexToRequestIndex(chunk)
defer func() { pendingChunksRemaining-- }()
sortPeersForPiece(nil)
for _, peer := range peersForPiece {
})
chunk:
for chunk, prePeers := range preallocated {
+ if len(prePeers) == 0 {
+ continue
+ }
pendingChunksRemaining--
- req := Request{pp.Integer(p.index), chunk}
+ req := p.chunkIndexToRequestIndex(ChunkIndex(chunk))
for _, pp := range prePeers {
pp.requestsInPiece--
}
sortPeersForPiece(&req)
for _, pp := range prePeers {
- delete(pp.nextState.Requests, req)
+ pp.nextState.Requests.Remove(req)
}
for _, peer := range peersForPiece {
if !peer.canFitRequest() {
"math"
"testing"
+ "github.com/RoaringBitmap/roaring"
qt "github.com/frankban/quicktest"
-
- pp "github.com/anacrolix/torrent/peer_protocol"
+ "github.com/google/go-cmp/cmp"
)
-func r(i pieceIndex, begin int) Request {
- return Request{pp.Integer(i), ChunkSpec{pp.Integer(begin), 1}}
-}
-
-func chunkIterRange(end int) func(func(ChunkSpec)) {
- return func(f func(ChunkSpec)) {
- for offset := 0; offset < end; offset += 1 {
- f(ChunkSpec{pp.Integer(offset), 1})
+func chunkIterRange(end ChunkIndex) ChunksIter {
+ return func(f func(ChunkIndex)) {
+ for offset := ChunkIndex(0); offset < end; offset += 1 {
+ f(offset)
}
}
}
-func chunkIter(offsets ...int) func(func(ChunkSpec)) {
- return func(f func(ChunkSpec)) {
+func chunkIter(offsets ...ChunkIndex) ChunksIter {
+ return func(f func(ChunkIndex)) {
for _, offset := range offsets {
- f(ChunkSpec{pp.Integer(offset), 1})
+ f(offset)
}
}
}
-func requestSetFromSlice(rs ...Request) (ret map[Request]struct{}) {
- ret = make(map[Request]struct{}, len(rs))
- for _, r := range rs {
- ret[r] = struct{}{}
- }
+func requestSetFromSlice(rs ...RequestIndex) (ret roaring.Bitmap) {
+ ret.AddMany(rs)
return
}
return uintptr(i)
}
+func hasAllRequests(RequestIndex) bool { return true }
+
func TestStealingFromSlowerPeer(t *testing.T) {
c := qt.New(t)
basePeer := Peer{
// Slower than the stealers, but has all requests already.
stealee := basePeer
stealee.DownloadRate = 1
- stealee.HasExistingRequest = func(r Request) bool {
- return true
- }
+ stealee.HasExistingRequest = hasAllRequests
stealee.Id = intPeerId(1)
firstStealer := basePeer
firstStealer.Id = intPeerId(2)
secondStealer := basePeer
secondStealer.Id = intPeerId(3)
results := Run(Input{Torrents: []Torrent{{
+ ChunksPerPiece: 9,
Pieces: []Piece{{
Request: true,
NumPendingChunks: 5,
}}})
c.Assert(results, qt.HasLen, 3)
- check := func(p PeerId, l int) {
- c.Check(results[p].Requests, qt.HasLen, l)
+ check := func(p PeerId, l uint64) {
+ addressableBm := results[p].Requests
+ c.Check(addressableBm.GetCardinality(), qt.ContentEquals, l)
c.Check(results[p].Interested, qt.Equals, l > 0)
}
check(stealee.Id, 1)
check(secondStealer.Id, 2)
}
-func checkNumRequestsAndInterest(c *qt.C, next PeerNextRequestState, num int, interest bool) {
- c.Check(next.Requests, qt.HasLen, num)
+func checkNumRequestsAndInterest(c *qt.C, next PeerNextRequestState, num uint64, interest bool) {
+ addressableBm := next.Requests
+ c.Check(addressableBm.GetCardinality(), qt.ContentEquals, num)
c.Check(next.Interested, qt.Equals, interest)
}
}
stealee := basePeer
stealee.DownloadRate = 1
- stealee.HasExistingRequest = func(r Request) bool {
- return true
- }
+ stealee.HasExistingRequest = hasAllRequests
stealee.Id = intPeerId(1)
firstStealer := basePeer
firstStealer.Id = intPeerId(2)
secondStealer := basePeer
secondStealer.Id = intPeerId(3)
results := Run(Input{Torrents: []Torrent{{
+ ChunksPerPiece: 9,
Pieces: []Piece{{
Request: true,
NumPendingChunks: 2,
checkNumRequestsAndInterest(c, results[stealee.Id], 0, false)
}
+func checkResultsRequestsLen(t *testing.T, reqs roaring.Bitmap, l uint64) {
+ qt.Check(t, reqs.GetCardinality(), qt.Equals, l)
+}
+
func TestPeerKeepsExistingIfReasonable(t *testing.T) {
c := qt.New(t)
basePeer := Peer{
// Slower than the stealers, but has all requests already.
stealee := basePeer
stealee.DownloadRate = 1
- keepReq := r(0, 0)
- stealee.HasExistingRequest = func(r Request) bool {
+ keepReq := RequestIndex(0)
+ stealee.HasExistingRequest = func(r RequestIndex) bool {
return r == keepReq
}
stealee.Id = intPeerId(1)
secondStealer := basePeer
secondStealer.Id = intPeerId(3)
results := Run(Input{Torrents: []Torrent{{
+ ChunksPerPiece: 9,
Pieces: []Piece{{
Request: true,
NumPendingChunks: 4,
}}})
c.Assert(results, qt.HasLen, 3)
- check := func(p PeerId, l int) {
- c.Check(results[p].Requests, qt.HasLen, l)
+ check := func(p PeerId, l uint64) {
+ checkResultsRequestsLen(t, results[p].Requests, l)
c.Check(results[p].Interested, qt.Equals, l > 0)
}
check(firstStealer.Id, 2)
check(secondStealer.Id, 1)
- c.Check(results[stealee.Id], qt.ContentEquals, PeerNextRequestState{
- Interested: true,
- Requests: requestSetFromSlice(keepReq),
- })
+ c.Check(
+ results[stealee.Id],
+ peerNextRequestStateChecker,
+ PeerNextRequestState{
+ Interested: true,
+ Requests: requestSetFromSlice(keepReq),
+ },
+ )
}
+var peerNextRequestStateChecker = qt.CmpEquals(
+ cmp.Transformer(
+ "bitmap",
+ func(bm roaring.Bitmap) []uint32 {
+ return bm.ToArray()
+ }))
+
func TestDontStealUnnecessarily(t *testing.T) {
c := qt.New(t)
basePeer := Peer{
// Slower than the stealers, but has all requests already.
stealee := basePeer
stealee.DownloadRate = 1
+ r := func(i, c RequestIndex) RequestIndex {
+ return i*9 + c
+ }
keepReqs := requestSetFromSlice(
r(3, 2), r(3, 4), r(3, 6), r(3, 8),
r(4, 0), r(4, 1), r(4, 7), r(4, 8))
- stealee.HasExistingRequest = func(r Request) bool {
- _, ok := keepReqs[r]
- return ok
+ stealee.HasExistingRequest = func(r RequestIndex) bool {
+ return keepReqs.Contains(r)
}
stealee.Id = intPeerId(1)
firstStealer := basePeer
}
}
results := Run(Input{Torrents: []Torrent{{
+ ChunksPerPiece: 9,
Pieces: []Piece{
{
Request: true,
}}})
c.Assert(results, qt.HasLen, 3)
- check := func(p PeerId, l int) {
- c.Check(results[p].Requests, qt.HasLen, l)
+ check := func(p PeerId, l uint64) {
+ checkResultsRequestsLen(t, results[p].Requests, l)
c.Check(results[p].Interested, qt.Equals, l > 0)
}
check(firstStealer.Id, 5)
check(secondStealer.Id, 7+9)
- c.Check(results[stealee.Id], qt.ContentEquals, PeerNextRequestState{
- Interested: true,
- Requests: requestSetFromSlice(r(4, 0), r(4, 1), r(4, 7), r(4, 8)),
- })
+ c.Check(
+ results[stealee.Id],
+ peerNextRequestStateChecker,
+ PeerNextRequestState{
+ Interested: true,
+ Requests: requestSetFromSlice(r(4, 0), r(4, 1), r(4, 7), r(4, 8)),
+ },
+ )
}
// This tests a situation where multiple peers had the same existing request, due to "actual" and
func TestDuplicatePreallocations(t *testing.T) {
peer := func(id int, downloadRate float64) Peer {
return Peer{
- HasExistingRequest: func(r Request) bool {
- return true
- },
- MaxRequests: 2,
+ HasExistingRequest: hasAllRequests,
+ MaxRequests: 2,
HasPiece: func(i pieceIndex) bool {
return true
},
}
results := Run(Input{
Torrents: []Torrent{{
+ ChunksPerPiece: 1,
Pieces: []Piece{{
Request: true,
NumPendingChunks: 1,
}},
})
c := qt.New(t)
- c.Assert(2, qt.Equals, len(results[intPeerId(1)].Requests)+len(results[intPeerId(2)].Requests))
+ req1 := results[intPeerId(1)].Requests
+ req2 := results[intPeerId(2)].Requests
+ c.Assert(uint64(2), qt.Equals, req1.GetCardinality()+req2.GetCardinality())
}
import (
"time"
+
+ "github.com/RoaringBitmap/roaring"
)
type PeerNextRequestState struct {
Interested bool
- Requests map[Request]struct{}
+ Requests roaring.Bitmap
}
type PeerId interface {
type Peer struct {
HasPiece func(i pieceIndex) bool
MaxRequests int
- HasExistingRequest func(r Request) bool
+ HasExistingRequest func(r RequestIndex) bool
Choking bool
PieceAllowedFast func(pieceIndex) bool
DownloadRate float64
package request_strategy
-import (
- "github.com/anacrolix/torrent/types"
-)
-
-type ChunksIter func(func(types.ChunkSpec))
+type ChunksIter func(func(ChunkIndex))
type Piece struct {
Request bool
IterPendingChunks ChunksIter
}
-func (p Piece) iterPendingChunksWrapper(f func(ChunkSpec)) {
+func (p Piece) iterPendingChunksWrapper(f func(ChunkIndex)) {
i := p.IterPendingChunks
if i != nil {
i(f)
// Unclosed Peers. Not necessary for getting requestable piece ordering.
Peers []Peer
// Some value that's unique and stable between runs. Could even use the infohash?
- InfoHash metainfo.Hash
+ InfoHash metainfo.Hash
+ ChunksPerPiece int
MaxUnverifiedBytes int64
}
"unsafe"
"github.com/anacrolix/missinggo/v2/bitmap"
- pp "github.com/anacrolix/torrent/peer_protocol"
"github.com/anacrolix/chansync"
request_strategy "github.com/anacrolix/torrent/request-strategy"
func (cl *Client) getRequestStrategyInput() request_strategy.Input {
ts := make([]request_strategy.Torrent, 0, len(cl.torrents))
for _, t := range cl.torrents {
+ if !t.haveInfo() {
+ // This would be removed if metadata is handled here.
+ continue
+ }
rst := request_strategy.Torrent{
- InfoHash: t.infoHash,
+ InfoHash: t.infoHash,
+ ChunksPerPiece: (t.usualPieceSize() + int(t.chunkSize) - 1) / int(t.chunkSize),
}
if t.storage != nil {
rst.Capacity = t.storage.Capacity
rst.Peers = append(rst.Peers, request_strategy.Peer{
HasPiece: p.peerHasPiece,
MaxRequests: p.nominalMaxRequests(),
- HasExistingRequest: func(r request_strategy.Request) bool {
- _, ok := p.actualRequestState.Requests[r]
- return ok
+ HasExistingRequest: func(r RequestIndex) bool {
+ return p.actualRequestState.Requests.Contains(r)
},
Choking: p.peerChoking,
PieceAllowedFast: func(i pieceIndex) bool {
p.onNextRequestStateChanged()
}
+type RequestIndex = request_strategy.RequestIndex
+type chunkIndexType = request_strategy.ChunkIndex
+
func (p *Peer) applyNextRequestState() bool {
- if len(p.actualRequestState.Requests) > p.nominalMaxRequests()/2 {
+ if p.actualRequestState.Requests.GetCardinality() > uint64(p.nominalMaxRequests()/2) {
return true
}
type piece struct {
for _, endGameIter := range []bool{false, true} {
for _, piece := range pieceOrder {
tp := p.t.piece(piece.index)
- tp.iterUndirtiedChunks(func(cs ChunkSpec) {
- req := Request{pp.Integer(piece.index), cs}
+ tp.iterUndirtiedChunks(func(cs chunkIndexType) {
+ req := cs + tp.requestIndexOffset()
if !piece.endGame && !endGameIter && p.t.pendingRequests[req] > 0 {
return
}
if !more {
return
}
- if len(p.actualRequestState.Requests) >= p.nominalMaxRequests() {
+ if maxRequests(p.actualRequestState.Requests.GetCardinality()) >= p.nominalMaxRequests() {
return
}
- if p.peerChoking && !p.peerAllowedFast.Contains(bitmap.BitIndex(req.Index)) {
+ if p.peerChoking && !p.peerAllowedFast.Contains(bitmap.BitIndex(piece.index)) {
return
}
var err error
panic(err)
}
})
- if interested && len(p.actualRequestState.Requests) >= p.nominalMaxRequests() {
+ if interested && maxRequests(p.actualRequestState.Requests.GetCardinality()) >= p.nominalMaxRequests() {
break
}
if !more {
connPieceInclinationPool sync.Pool
// Count of each request across active connections.
- pendingRequests map[Request]int
+ pendingRequests map[RequestIndex]int
pex pexState
}
t.cl.event.Broadcast()
close(t.gotMetainfoC)
t.updateWantPeersEvent()
- t.pendingRequests = make(map[Request]int)
+ t.pendingRequests = make(map[RequestIndex]int)
t.tryCreateMorePieceHashers()
}
return
}
-func (t *Torrent) pieceNumChunks(piece pieceIndex) pp.Integer {
- return (t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize
+func (t *Torrent) chunksPerRegularPiece() uint32 {
+ return uint32((pp.Integer(t.usualPieceSize()) + t.chunkSize - 1) / t.chunkSize)
+}
+
+func (t *Torrent) pieceNumChunks(piece pieceIndex) chunkIndexType {
+ return chunkIndexType((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize)
}
func (t *Torrent) pendAllChunkSpecs(pieceIndex pieceIndex) {
return !p.pendingChunk(r.ChunkSpec, t.chunkSize)
}
-func chunkIndex(cs ChunkSpec, chunkSize pp.Integer) int {
- return int(cs.Begin / chunkSize)
+func chunkIndexFromChunkSpec(cs ChunkSpec, chunkSize pp.Integer) chunkIndexType {
+ return chunkIndexType(cs.Begin / chunkSize)
}
func (t *Torrent) wantPieceIndex(index pieceIndex) bool {
if t.pieceComplete(piece) {
return 0
}
- return t.pieceNumChunks(piece) - t.pieces[piece].numDirtyChunks()
+ return pp.Integer(t.pieceNumChunks(piece)) - t.pieces[piece].numDirtyChunks()
}
func (t *Torrent) pieceAllDirty(piece pieceIndex) bool {
return ret
}
-func (t *Torrent) pendRequest(req Request) {
- ci := chunkIndex(req.ChunkSpec, t.chunkSize)
- t.pieces[req.Index].pendChunkIndex(ci)
+func (t *Torrent) pendRequest(req RequestIndex) {
+ t.piece(int(req / t.chunksPerRegularPiece())).pendChunkIndex(req % t.chunksPerRegularPiece())
}
func (t *Torrent) pieceCompletionChanged(piece pieceIndex) {
})
return
}
+
+func (t *Torrent) requestIndexToRequest(ri RequestIndex) Request {
+ index := ri / t.chunksPerRegularPiece()
+ return Request{
+ pp.Integer(index),
+ t.piece(int(index)).chunkIndexSpec(pp.Integer(ri % t.chunksPerRegularPiece())),
+ }
+}
+
+func (t *Torrent) requestIndexFromRequest(r Request) RequestIndex {
+ return t.chunksPerRegularPiece()*uint32(r.Index) + uint32(r.Begin/t.chunkSize)
+}
+
+func (t *Torrent) numChunks() RequestIndex {
+ return RequestIndex((t.Length() + int64(t.chunkSize) - 1) / int64(t.chunkSize))
+}
defer ws.requesterCond.L.Unlock()
start:
for !ws.peer.closed.IsSet() {
- for r := range ws.peer.actualRequestState.Requests {
+ restart := false
+ ws.peer.actualRequestState.Requests.Iterate(func(x uint32) bool {
+ r := ws.peer.t.requestIndexToRequest(x)
if _, ok := ws.activeRequests[r]; ok {
- continue
+ return true
}
ws.doRequest(r)
+ restart = true
+ return false
+ })
+ if restart {
goto start
}
ws.requesterCond.Wait()
}() {
ws.peer.close()
} else {
- ws.peer.remoteRejectedRequest(r)
+ ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r))
}
} else {
err := ws.peer.receiveChunk(&pp.Message{