From 7d4f64ce3ca7783e6f30935cc1bbe1f9668ce684 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Mon, 9 May 2022 19:34:43 +1000 Subject: [PATCH] Combine pending and last requested --- peerconn.go | 20 ++++++++++---------- requesting.go | 10 ++++++---- torrent.go | 23 +++++++++++++++-------- 3 files changed, 31 insertions(+), 22 deletions(-) diff --git a/peerconn.go b/peerconn.go index 08b1814b..fea679dd 100644 --- a/peerconn.go +++ b/peerconn.go @@ -48,11 +48,10 @@ type PeerRemoteAddr interface { String() string } -// Since we have to store all the requests in memory, we can't reasonably exceed what would be -// indexable with the memory space available. type ( - maxRequests = int - requestState = request_strategy.PeerRequestState + // Since we have to store all the requests in memory, we can't reasonably exceed what could be + // indexed with the memory space available. + maxRequests = int ) type Peer struct { @@ -85,7 +84,7 @@ type Peer struct { // Stuff controlled by the local peer. needRequestUpdate string - requestState requestState + requestState request_strategy.PeerRequestState updateRequestsTimer *time.Timer lastRequestUpdate time.Time peakRequests maxRequests @@ -637,8 +636,10 @@ func (cn *Peer) request(r RequestIndex) (more bool, err error) { cn.validReceiveChunks = make(map[RequestIndex]int) } cn.validReceiveChunks[r]++ - cn.t.pendingRequests[r] = cn - cn.t.lastRequested[r] = time.Now() + cn.t.requestState[r] = requestState{ + peer: cn, + when: time.Now(), + } cn.updateExpectingChunks() ppReq := cn.t.requestIndexToRequest(r) for _, f := range cn.callbacks.SentRequest { @@ -1464,7 +1465,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { piece.unpendChunkIndex(chunkIndexFromChunkSpec(ppReq.ChunkSpec, t.chunkSize)) // Cancel pending requests for this chunk from *other* peers. - if p := t.pendingRequests[req]; p != nil { + if p := t.requestingPeer(req); p != nil { if p == c { panic("should not be pending request from conn that just received it") } @@ -1627,8 +1628,7 @@ func (c *Peer) deleteRequest(r RequestIndex) bool { if c.t.requestingPeer(r) != c { panic("only one peer should have a given request at a time") } - delete(c.t.pendingRequests, r) - delete(c.t.lastRequested, r) + delete(c.t.requestState, r) // c.t.iterPeers(func(p *Peer) { // if p.isLowOnRequests() { // p.updateRequests("Peer.deleteRequest") diff --git a/requesting.go b/requesting.go index 411fac53..7b6152fd 100644 --- a/requesting.go +++ b/requesting.go @@ -118,8 +118,10 @@ func (p *desiredPeerRequests) lessByValue(leftRequest, rightRequest RequestIndex if ml.Ok() { return ml.MustLess() } - leftPeer := t.pendingRequests[leftRequest] - rightPeer := t.pendingRequests[rightRequest] + leftRequestState := t.requestState[leftRequest] + rightRequestState := t.requestState[rightRequest] + leftPeer := leftRequestState.peer + rightPeer := rightRequestState.peer // Prefer chunks already requested from this peer. ml = ml.Bool(rightPeer == p.peer, leftPeer == p.peer) // Prefer unrequested chunks. @@ -134,8 +136,8 @@ func (p *desiredPeerRequests) lessByValue(leftRequest, rightRequest RequestIndex leftPeer.requestState.Requests.GetCardinality(), ) // Could either of the lastRequested be Zero? That's what checking an existing peer is for. - leftLast := t.lastRequested[leftRequest] - rightLast := t.lastRequested[rightRequest] + leftLast := leftRequestState.when + rightLast := rightRequestState.when if leftLast.IsZero() || rightLast.IsZero() { panic("expected non-zero last requested times") } diff --git a/torrent.go b/torrent.go index 81e80ddb..3e91e736 100644 --- a/torrent.go +++ b/torrent.go @@ -140,9 +140,8 @@ type Torrent struct { initialPieceCheckDisabled bool connsWithAllPieces map[*Peer]struct{} - // Count of each request across active connections. - pendingRequests map[RequestIndex]*Peer - lastRequested map[RequestIndex]time.Time + + requestState map[RequestIndex]requestState // Chunks we've written to since the corresponding piece was last checked. dirtyChunks typedRoaring.Bitmap[RequestIndex] @@ -472,8 +471,7 @@ func (t *Torrent) onSetInfo() { t.cl.event.Broadcast() close(t.gotMetainfoC) t.updateWantPeersEvent() - t.pendingRequests = make(map[RequestIndex]*Peer) - t.lastRequested = make(map[RequestIndex]time.Time) + t.requestState = make(map[RequestIndex]requestState) t.tryCreateMorePieceHashers() t.iterPeers(func(p *Peer) { p.onGotInfo(t.info) @@ -2453,16 +2451,20 @@ func (t *Torrent) updateComplete() { } func (t *Torrent) cancelRequest(r RequestIndex) *Peer { - p := t.pendingRequests[r] + p := t.requestingPeer(r) if p != nil { p.cancel(r) } - delete(t.pendingRequests, r) + // TODO: This is a check that an old invariant holds. It can be removed after some testing. + //delete(t.pendingRequests, r) + if _, ok := t.requestState[r]; ok { + panic("expected request state to be gone") + } return p } func (t *Torrent) requestingPeer(r RequestIndex) *Peer { - return t.pendingRequests[r] + return t.requestState[r].peer } func (t *Torrent) addConnWithAllPieces(p *Peer) { @@ -2494,3 +2496,8 @@ func (t *Torrent) hasStorageCap() bool { func (t *Torrent) pieceIndexOfRequestIndex(ri RequestIndex) pieceIndex { return pieceIndex(ri / t.chunksPerRegularPiece()) } + +type requestState struct { + peer *Peer + when time.Time +} -- 2.44.0