ReceiveEncryptedHandshakeSkeys mse.SecretKeyIter
ReceivedUsefulData []func(ReceivedUsefulDataEvent)
+ ReceivedRequested []func(PeerMessageEvent)
+ DeletedRequest []func(PeerRequestEvent)
+ SentRequest []func(PeerRequestEvent)
+ PeerClosed []func(*Peer)
+ NewPeer []func(*Peer)
}
-type ReceivedUsefulDataEvent struct {
+type ReceivedUsefulDataEvent = PeerMessageEvent
+
+type PeerMessageEvent struct {
Peer *Peer
Message *pp.Message
}
+
+type PeerRequestEvent struct {
+ Peer *Peer
+ Request
+}
RemoteAddr: remoteAddr,
Network: network,
+ callbacks: &cl.config.Callbacks,
},
connString: connString,
conn: nc,
writeBuffer: new(bytes.Buffer),
- callbacks: &cl.config.Callbacks,
}
c.peerImpl = c
c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c)
r: c.r,
}
c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
+ for _, f := range cl.config.Callbacks.NewPeer {
+ f(&c.Peer)
+ }
return
}
tor.cl.lock()
assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
tor.cl.unlock()
- assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
+ assert.EqualValues(t, ChunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
}
func TestReducedDialTimeout(t *testing.T) {
pp "github.com/anacrolix/torrent/peer_protocol"
)
-type chunkSpec struct {
+type ChunkSpec struct {
Begin, Length pp.Integer
}
-type request struct {
+type Request struct {
Index pp.Integer
- chunkSpec
+ ChunkSpec
}
-func (r request) ToMsg(mt pp.MessageType) pp.Message {
+func (r Request) ToMsg(mt pp.MessageType) pp.Message {
return pp.Message{
Type: mt,
Index: r.Index,
}
}
-func newRequest(index, begin, length pp.Integer) request {
- return request{index, chunkSpec{begin, length}}
+func newRequest(index, begin, length pp.Integer) Request {
+ return Request{index, ChunkSpec{begin, length}}
}
-func newRequestFromMessage(msg *pp.Message) request {
+func newRequestFromMessage(msg *pp.Message) Request {
switch msg.Type {
case pp.Request, pp.Cancel, pp.Reject:
return newRequest(msg.Index, msg.Begin, msg.Length)
// Return the request that would include the given offset into the torrent data.
func torrentOffsetRequest(torrentLength, pieceSize, chunkSize, offset int64) (
- r request, ok bool) {
+ r Request, ok bool) {
if offset < 0 || offset >= torrentLength {
return
}
return
}
-func torrentRequestOffset(torrentLength, pieceSize int64, r request) (off int64) {
+func torrentRequestOffset(torrentLength, pieceSize int64, r Request) (off int64) {
off = int64(r.Index)*pieceSize + int64(r.Begin)
if off < 0 || off >= torrentLength {
- panic("invalid request")
+ panic("invalid Request")
}
return
}
return nil
}
-func chunkIndexSpec(index pp.Integer, pieceLength, chunkSize pp.Integer) chunkSpec {
- ret := chunkSpec{pp.Integer(index) * chunkSize, chunkSize}
+func chunkIndexSpec(index pp.Integer, pieceLength, chunkSize pp.Integer) ChunkSpec {
+ ret := ChunkSpec{pp.Integer(index) * chunkSize, chunkSize}
if ret.Begin+ret.Length > pieceLength {
ret.Length = pieceLength - ret.Begin
}
)
func TestTorrentOffsetRequest(t *testing.T) {
- check := func(tl, ps, off int64, expected request, ok bool) {
+ check := func(tl, ps, off int64, expected Request, ok bool) {
req, _ok := torrentOffsetRequest(tl, ps, defaultChunkSize, off)
assert.Equal(t, _ok, ok)
assert.Equal(t, req, expected)
check(13, 5, 0, newRequest(0, 0, 5), true)
check(13, 5, 3, newRequest(0, 0, 5), true)
check(13, 5, 11, newRequest(2, 0, 3), true)
- check(13, 5, 13, request{}, false)
+ check(13, 5, 13, Request{}, false)
}
func TestIterBitmapsDistinct(t *testing.T) {
type peerImpl interface {
updateRequests()
writeInterested(interested bool) bool
- cancel(request) bool
+ cancel(Request) bool
// Return true if there's room for more activity.
- request(request) bool
+ request(Request) bool
connectionFlags() string
onClose()
- _postCancel(request)
+ _postCancel(Request)
onGotInfo(*metainfo.Info)
drop()
String() string
t *Torrent
peerImpl
+ callbacks *Callbacks
outgoing bool
Network string
_chunksReceivedWhileExpecting int64
choking bool
- requests map[request]struct{}
+ requests map[Request]struct{}
requestsLowWater int
// Chunks that we might reasonably expect to receive from the peer. Due to
// latency, buffering, and implementation differences, we may receive
// chunks that are no longer in the set of requests actually want.
- validReceiveChunks map[request]int
+ validReceiveChunks map[Request]int
// Indexed by metadata piece, set to true if posted and pending a
// response.
metadataRequests []bool
// Stuff controlled by the remote peer.
peerInterested bool
peerChoking bool
- peerRequests map[request]*peerRequestState
+ peerRequests map[Request]*peerRequestState
PeerPrefersEncryption bool // as indicated by 'e' field in extension handshake
PeerListenPort int
// The pieces the peer has claimed to have.
writerCond sync.Cond
pex pexConnState
-
- callbacks *Callbacks
}
func (cn *PeerConn) connStatusString() string {
cn.discardPieceInclination()
cn._pieceRequestOrder.Clear()
cn.peerImpl.onClose()
+ for _, f := range cn.callbacks.PeerClosed {
+ f(cn)
+ }
}
func (cn *PeerConn) onClose() {
}
-func (cn *PeerConn) onPeerSentCancel(r request) {
+func (cn *PeerConn) onPeerSentCancel(r Request) {
if _, ok := cn.peerRequests[r]; !ok {
torrent.Add("unexpected cancels received", 1)
return
// are okay.
type messageWriter func(pp.Message) bool
-func (cn *Peer) request(r request) bool {
+func (cn *Peer) request(r Request) bool {
if _, ok := cn.requests[r]; ok {
panic("chunk already requested")
}
panic("piece is queued for hash")
}
if cn.requests == nil {
- cn.requests = make(map[request]struct{})
+ cn.requests = make(map[Request]struct{})
}
cn.requests[r] = struct{}{}
if cn.validReceiveChunks == nil {
- cn.validReceiveChunks = make(map[request]int)
+ cn.validReceiveChunks = make(map[Request]int)
}
cn.validReceiveChunks[r]++
cn.t.pendingRequests[r]++
cn.t.requestStrategy.hooks().sentRequest(r)
cn.updateExpectingChunks()
+ for _, f := range cn.callbacks.SentRequest {
+ f(PeerRequestEvent{cn, r})
+ }
return cn.peerImpl.request(r)
}
-func (me *PeerConn) request(r request) bool {
+func (me *PeerConn) request(r Request) bool {
return me.write(pp.Message{
Type: pp.Request,
Index: r.Index,
})
}
-func (me *PeerConn) cancel(r request) bool {
+func (me *PeerConn) cancel(r Request) bool {
return me.write(makeCancelMessage(r))
}
} else if len(cn.requests) <= cn.requestsLowWater {
filledBuffer := false
cn.iterPendingPieces(func(pieceIndex pieceIndex) bool {
- cn.iterPendingRequests(pieceIndex, func(r request) bool {
+ cn.iterPendingRequests(pieceIndex, func(r Request) bool {
if !cn.setInterested(true) {
filledBuffer = true
return false
cn.iterPendingPieces(func(i pieceIndex) bool { return f(i) })
}
-func (cn *Peer) iterPendingRequests(piece pieceIndex, f func(request) bool) bool {
+func (cn *Peer) iterPendingRequests(piece pieceIndex, f func(Request) bool) bool {
return cn.t.requestStrategy.iterUndirtiedChunks(
cn.t.piece(piece).requestStrategyPiece(),
- func(cs chunkSpec) bool {
- return f(request{pp.Integer(piece), cs})
+ func(cs ChunkSpec) bool {
+ return f(Request{pp.Integer(piece), cs})
},
)
}
return c.PeerExtensionBytes.SupportsFast() && c.t.cl.config.Extensions.SupportsFast()
}
-func (c *PeerConn) reject(r request) {
+func (c *PeerConn) reject(r Request) {
if !c.fastEnabled() {
panic("fast not enabled")
}
delete(c.peerRequests, r)
}
-func (c *PeerConn) onReadRequest(r request) error {
+func (c *PeerConn) onReadRequest(r Request) error {
requestedChunkLengths.Add(strconv.FormatUint(r.Length.Uint64(), 10), 1)
if _, ok := c.peerRequests[r]; ok {
torrent.Add("duplicate requests received", 1)
// Check this after we know we have the piece, so that the piece length will be known.
if r.Begin+r.Length > c.t.pieceLength(pieceIndex(r.Index)) {
torrent.Add("bad requests received", 1)
- return errors.New("bad request")
+ return errors.New("bad Request")
}
if c.peerRequests == nil {
- c.peerRequests = make(map[request]*peerRequestState, maxRequests)
+ c.peerRequests = make(map[Request]*peerRequestState, maxRequests)
}
value := &peerRequestState{}
c.peerRequests[r] = value
return nil
}
-func (c *PeerConn) peerRequestDataReader(r request, prs *peerRequestState) {
+func (c *PeerConn) peerRequestDataReader(r Request, prs *peerRequestState) {
b, err := readPeerRequestData(r, c)
c.locker().Lock()
defer c.locker().Unlock()
// If this is maintained correctly, we might be able to support optional synchronous reading for
// chunk sending, the way it used to work.
-func (c *PeerConn) peerRequestDataReadFailed(err error, r request) {
- c.logger.WithDefaultLevel(log.Warning).Printf("error reading chunk for peer request %v: %v", r, err)
+func (c *PeerConn) peerRequestDataReadFailed(err error, r Request) {
+ c.logger.WithDefaultLevel(log.Warning).Printf("error reading chunk for peer Request %v: %v", r, err)
i := pieceIndex(r.Index)
if c.t.pieceComplete(i) {
// There used to be more code here that just duplicated the following break. Piece
c.choke(c.post)
}
-func readPeerRequestData(r request, c *PeerConn) ([]byte, error) {
+func readPeerRequestData(r Request, c *PeerConn) ([]byte, error) {
b := make([]byte, r.Length)
p := c.t.info.Piece(int(r.Index))
n, err := c.t.readAt(b, p.Offset()+int64(r.Begin))
}
}
-func (c *Peer) remoteRejectedRequest(r request) {
+func (c *Peer) remoteRejectedRequest(r Request) {
if c.deleteRequest(r) {
c.decExpectedChunkReceive(r)
}
}
-func (c *Peer) decExpectedChunkReceive(r request) {
+func (c *Peer) decExpectedChunkReceive(r Request) {
count := c.validReceiveChunks[r]
if count == 1 {
delete(c.validReceiveChunks, r)
piece.incrementPendingWrites()
// Record that we have the chunk, so we aren't trying to download it while
// waiting for it to be written to storage.
- piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
+ piece.unpendChunkIndex(chunkIndex(req.ChunkSpec, t.chunkSize))
// Cancel pending requests for this chunk.
for c := range t.conns {
return len(c.requests)
}
-func (c *Peer) deleteRequest(r request) bool {
+func (c *Peer) deleteRequest(r Request) bool {
if _, ok := c.requests[r]; !ok {
return false
}
delete(c.requests, r)
+ for _, f := range c.callbacks.DeletedRequest {
+ f(PeerRequestEvent{c, r})
+ }
c.updateExpectingChunks()
c.t.requestStrategy.hooks().deletedRequest(r)
pr := c.t.pendingRequests
c.writerCond.Broadcast()
}
-func (c *Peer) postCancel(r request) bool {
+func (c *Peer) postCancel(r Request) bool {
if !c.deleteRequest(r) {
return false
}
return true
}
-func (c *PeerConn) _postCancel(r request) {
+func (c *PeerConn) _postCancel(r Request) {
c.post(makeCancelMessage(r))
}
-func (c *PeerConn) sendChunk(r request, msg func(pp.Message) bool, state *peerRequestState) (more bool) {
+func (c *PeerConn) sendChunk(r Request, msg func(pp.Message) bool, state *peerRequestState) (more bool) {
c.lastChunkSent = time.Now()
return msg(pp.Message{
Type: pp.Piece,
// The chunk must be written to storage everytime, to ensure the
// writeSem is unlocked.
t.pieces[0]._dirtyChunks.Clear()
- cn.validReceiveChunks = map[request]int{newRequestFromMessage(&msg): 1}
+ cn.validReceiveChunks = map[Request]int{newRequestFromMessage(&msg): 1}
cl.unlock()
n, err := w.Write(wb)
require.NoError(b, err)
return !p._dirtyChunks.Contains(chunkIndex)
}
-func (p *Piece) pendingChunk(cs chunkSpec, chunkSize pp.Integer) bool {
+func (p *Piece) pendingChunk(cs ChunkSpec, chunkSize pp.Integer) bool {
return p.pendingChunkIndex(chunkIndex(cs, chunkSize))
}
return p._dirtyChunks.Contains(bitmap.BitIndex(chunk))
}
-func (p *Piece) chunkIndexSpec(chunk pp.Integer) chunkSpec {
+func (p *Piece) chunkIndexSpec(chunk pp.Integer) ChunkSpec {
return chunkIndexSpec(chunk, p.length(), p.chunkSize())
}
-func (p *Piece) chunkIndexRequest(chunkIndex pp.Integer) request {
- return request{
+func (p *Piece) chunkIndexRequest(chunkIndex pp.Integer) Request {
+ return Request{
pp.Integer(p.index),
chunkIndexSpec(chunkIndex, p.length(), p.chunkSize()),
}
pp "github.com/anacrolix/torrent/peer_protocol"
)
-func makeCancelMessage(r request) pp.Message {
+func makeCancelMessage(r Request) pp.Message {
return pp.MakeCancelMessage(r.Index, r.Begin, r.Length)
}
func (requestStrategyDefaults) hooks() requestStrategyHooks {
return requestStrategyHooks{
- sentRequest: func(request) {},
- deletedRequest: func(request) {},
+ sentRequest: func(Request) {},
+ deletedRequest: func(Request) {},
}
}
-func (requestStrategyDefaults) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool {
+func (requestStrategyDefaults) iterUndirtiedChunks(p requestStrategyPiece, f func(ChunkSpec) bool) bool {
chunkIndices := p.dirtyChunks().Copy()
chunkIndices.FlipRange(0, bitmap.BitIndex(p.numChunks()))
return iter.ForPerm(chunkIndices.Len(), func(i int) bool {
if err != nil {
panic(err)
}
- return f(p.chunkIndexRequest(pp.Integer(ci)).chunkSpec)
+ return f(p.chunkIndexRequest(pp.Integer(ci)).ChunkSpec)
})
}
type requestStrategyPiece interface {
numChunks() pp.Integer
dirtyChunks() bitmap.Bitmap
- chunkIndexRequest(i pp.Integer) request
+ chunkIndexRequest(i pp.Integer) Request
}
type requestStrategyTorrent interface {
type requestStrategy interface {
iterPendingPieces(requestStrategyConnection, func(pieceIndex) bool) bool
- iterUndirtiedChunks(requestStrategyPiece, func(chunkSpec) bool) bool
+ iterUndirtiedChunks(requestStrategyPiece, func(ChunkSpec) bool) bool
nominalMaxRequests(requestStrategyConnection) int
shouldRequestWithoutBias(requestStrategyConnection) bool
piecePriority(requestStrategyConnection, pieceIndex, piecePriority, int) int
}
type requestStrategyHooks struct {
- sentRequest func(request)
- deletedRequest func(request)
+ sentRequest func(Request)
+ deletedRequest func(Request)
}
type requestStrategyCallbacks interface {
- requestTimedOut(request)
+ requestTimedOut(Request)
}
type requestStrategyFuzzing struct {
// The last time we requested a chunk. Deleting the request from any connection will clear this
// value.
- lastRequested map[request]*time.Timer
+ lastRequested map[Request]*time.Timer
// The lock to take when running a request timeout handler.
timeoutLocker sync.Locker
}
return requestStrategyDuplicateRequestTimeout{
duplicateRequestTimeout: duplicateRequestTimeout,
callbacks: callbacks,
- lastRequested: make(map[request]*time.Timer),
+ lastRequested: make(map[Request]*time.Timer),
timeoutLocker: clientLocker,
}
}
func (rs requestStrategyDuplicateRequestTimeout) hooks() requestStrategyHooks {
return requestStrategyHooks{
- deletedRequest: func(r request) {
+ deletedRequest: func(r Request) {
if t, ok := rs.lastRequested[r]; ok {
t.Stop()
delete(rs.lastRequested, r)
}
}
-func (rs requestStrategyDuplicateRequestTimeout) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool {
+func (rs requestStrategyDuplicateRequestTimeout) iterUndirtiedChunks(p requestStrategyPiece, f func(ChunkSpec) bool) bool {
for i := pp.Integer(0); i < pp.Integer(p.numChunks()); i++ {
if p.dirtyChunks().Get(bitmap.BitIndex(i)) {
continue
if rs.wouldDuplicateRecent(r) {
continue
}
- if !f(r.chunkSpec) {
+ if !f(r.ChunkSpec) {
return false
}
}
return defaultIterPendingPieces(rs, cn, cb)
}
-func (rs requestStrategyDuplicateRequestTimeout) onSentRequest(r request) {
+func (rs requestStrategyDuplicateRequestTimeout) onSentRequest(r Request) {
rs.lastRequested[r] = time.AfterFunc(rs.duplicateRequestTimeout, func() {
rs.timeoutLocker.Lock()
delete(rs.lastRequested, r)
),
))
}
-func (rs requestStrategyDuplicateRequestTimeout) wouldDuplicateRecent(r request) bool {
+func (rs requestStrategyDuplicateRequestTimeout) wouldDuplicateRecent(r Request) bool {
// This piece has been requested on another connection, and the duplicate request timer is still
// running.
_, ok := rs.lastRequested[r]
connPieceInclinationPool sync.Pool
// Count of each request across active connections.
- pendingRequests map[request]int
+ pendingRequests map[Request]int
pex pexState
}
t.cl.event.Broadcast()
t.gotMetainfo.Set()
t.updateWantPeersEvent()
- t.pendingRequests = make(map[request]int)
+ t.pendingRequests = make(map[Request]int)
t.tryCreateMorePieceHashers()
}
return
}
-func (t *Torrent) requestOffset(r request) int64 {
+func (t *Torrent) requestOffset(r Request) int64 {
return torrentRequestOffset(*t.length, int64(t.usualPieceSize()), r)
}
// Return the request that would include the given offset into the torrent data. Returns !ok if
// there is no such request.
-func (t *Torrent) offsetRequest(off int64) (req request, ok bool) {
+func (t *Torrent) offsetRequest(off int64) (req Request, ok bool) {
return torrentOffsetRequest(*t.length, t.info.PieceLength, int64(t.chunkSize), off)
}
p.drop()
}
-func (t *Torrent) haveChunk(r request) (ret bool) {
+func (t *Torrent) haveChunk(r Request) (ret bool) {
// defer func() {
// log.Println("have chunk", r, ret)
// }()
return true
}
p := &t.pieces[r.Index]
- return !p.pendingChunk(r.chunkSpec, t.chunkSize)
+ return !p.pendingChunk(r.ChunkSpec, t.chunkSize)
}
-func chunkIndex(cs chunkSpec, chunkSize pp.Integer) int {
+func chunkIndex(cs ChunkSpec, chunkSize pp.Integer) int {
return int(cs.Begin / chunkSize)
}
return ret
}
-func (t *Torrent) pendRequest(req request) {
- ci := chunkIndex(req.chunkSpec, t.chunkSize)
+func (t *Torrent) pendRequest(req Request) {
+ ci := chunkIndex(req.ChunkSpec, t.chunkSize)
t.pieces[req.Index].pendChunkIndex(ci)
}
t *Torrent
}
-func (cb torrentRequestStrategyCallbacks) requestTimedOut(r request) {
- torrent.Add("request timeouts", 1)
+func (cb torrentRequestStrategyCallbacks) requestTimedOut(r Request) {
+ torrent.Add("Request timeouts", 1)
cb.t.cl.lock()
defer cb.t.cl.unlock()
cb.t.iterPeers(func(cn *Peer) {
}
}
+func (t *Torrent) callbacks() *Callbacks {
+ return &t.cl.config.Callbacks
+}
+
func (t *Torrent) addWebSeed(url string) {
if t.cl.config.DisableWebseeds {
return
HttpClient: http.DefaultClient,
Url: url,
},
- requests: make(map[request]webseed.Request, maxRequests),
+ requests: make(map[Request]webseed.Request, maxRequests),
+ }
+ for _, f := range t.callbacks().NewPeer {
+ f(&ws.peer)
}
ws.peer.logger = t.logger.WithContextValue(&ws)
ws.peer.peerImpl = &ws
"github.com/anacrolix/torrent/storage"
)
-func r(i, b, l pp.Integer) request {
- return request{i, chunkSpec{b, l}}
+func r(i, b, l pp.Integer) Request {
+ return Request{i, ChunkSpec{b, l}}
}
// Check the given request is correct for various torrent offsets.
const s = 472183431 // Length of torrent.
for _, _case := range []struct {
off int64 // An offset into the torrent.
- req request // The expected request. The zero value means !ok.
+ req Request // The expected request. The zero value means !ok.
}{
// Invalid offset.
- {-1, request{}},
+ {-1, Request{}},
{0, r(0, 0, 16384)},
// One before the end of a piece.
{1<<18 - 1, r(0, 1<<18-16384, 16384)},
// Offset beyond torrent length.
- {472 * 1 << 20, request{}},
+ {472 * 1 << 20, Request{}},
// One before the end of the torrent. Complicates the chunk length.
{s - 1, r((s-1)/(1<<18), (s-1)%(1<<18)/(16384)*(16384), 12935)},
{1, r(0, 0, 16384)},
{16384, r(0, 16384, 16384)},
} {
req, ok := torrentOffsetRequest(472183431, 1<<18, 16384, _case.off)
- if (_case.req == request{}) == ok {
+ if (_case.req == Request{}) == ok {
t.Fatalf("expected %v, got %v", _case.req, req)
}
if req != _case.req {
type webseedPeer struct {
client webseed.Client
- requests map[request]webseed.Request
+ requests map[Request]webseed.Request
peer Peer
}
ws.client.Info = info
}
-func (ws *webseedPeer) _postCancel(r request) {
+func (ws *webseedPeer) _postCancel(r Request) {
ws.cancel(r)
}
return true
}
-func (ws *webseedPeer) cancel(r request) bool {
+func (ws *webseedPeer) cancel(r Request) bool {
ws.requests[r].Cancel()
return true
}
-func (ws *webseedPeer) intoSpec(r request) webseed.RequestSpec {
+func (ws *webseedPeer) intoSpec(r Request) webseed.RequestSpec {
return webseed.RequestSpec{ws.peer.t.requestOffset(r), int64(r.Length)}
}
-func (ws *webseedPeer) request(r request) bool {
+func (ws *webseedPeer) request(r Request) bool {
webseedRequest := ws.client.NewRequest(ws.intoSpec(r))
ws.requests[r] = webseedRequest
go ws.requestResultHandler(r, webseedRequest)
func (ws *webseedPeer) onClose() {}
-func (ws *webseedPeer) requestResultHandler(r request, webseedRequest webseed.Request) {
+func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Request) {
result := <-webseedRequest.Result
ws.peer.t.cl.lock()
defer ws.peer.t.cl.unlock()
if result.Err != nil {
- ws.peer.logger.Printf("request %v rejected: %v", r, result.Err)
+ ws.peer.logger.Printf("Request %v rejected: %v", r, result.Err)
// Always close for now. We need to filter out temporary errors, but this is a nightmare in
// Go. Currently a bad webseed URL can starve out the good ones due to the chunk selection
// algorithm.