// Actually go ahead and modify the pending requests.
updateRequests()
- // handleCancel initiates cancellation of a request and returns acked if it expects the cancel
- // to be handled by a follow-up event.
+ // handleCancel initiates cancellation of a request
handleCancel(RequestIndex)
- acksCancels() bool
// The final piece to actually commit to a request. Typically, this sends or begins handling the
// request.
_request(Request) bool
// Abstract methods implemented by subclasses of Peer.
type newHotPeerImpl interface {
lastWriteUploadRate() float64
- // How many requests should be assigned to the peer.
- nominalMaxRequests() maxRequests
checkReceivedChunk(ri RequestIndex) error
+ expectingChunks() bool
}
)
type (
+ // Generic Peer-like fields. Could be WebSeed, BitTorrent over TCP, uTP or WebRTC.
Peer struct {
// First to ensure 64-bit alignment for atomics. See #262.
_stats ConnStats
}
func (cn *Peer) updateExpectingChunks() {
- if cn.expectingChunks() {
+ if cn.peerImpl.expectingChunks() {
if cn.lastStartedExpectingToReceiveChunks.IsZero() {
cn.lastStartedExpectingToReceiveChunks = time.Now()
}
}
}
-func (cn *Peer) expectingChunks() bool {
+func (cn *PeerConn) expectingChunks() bool {
if cn.requestState.Requests.IsEmpty() {
return false
}
cn.totalExpectingTime(),
)
fmt.Fprintf(w,
- "%s completed, %d pieces touched, good chunks: %v/%v:%v reqq: %d+%v/(%d/%d):%d/%d, flags: %s, dr: %.1f KiB/s\n",
+ "%s completed, %d pieces touched, good chunks: %v/%v:%v dr: %.1f KiB/s\n",
cn.completedString(),
len(cn.peerTouchedPieces),
&cn._stats.ChunksReadUseful,
&cn._stats.ChunksRead,
&cn._stats.ChunksWritten,
- cn.requestState.Requests.GetCardinality(),
- cn.requestState.Cancelled.GetCardinality(),
- cn.peerImpl.nominalMaxRequests(),
- cn.PeerMaxRequests,
- len(cn.peerRequests),
- localClientReqq,
- cn.statusFlags(),
cn.downloadRate()/(1<<10),
)
fmt.Fprintf(w, "requested pieces:")
if cn.requestState.Requests.Contains(r) {
return true, nil
}
- if maxRequests(cn.requestState.Requests.GetCardinality()) >= cn.peerImpl.nominalMaxRequests() {
+ if maxRequests(cn.requestState.Requests.GetCardinality()) >= cn.nominalMaxRequests() {
return true, errors.New("too many outstanding requests")
}
cn.requestState.Requests.Add(r)
panic("request not existing should have been guarded")
}
me.handleCancel(r)
- if me.acksCancels() {
- // Record that we expect to get a cancel ack.
- if !me.requestState.Cancelled.CheckedAdd(r) {
- panic("request already cancelled")
- }
- }
me.decPeakRequests()
if me.isLowOnRequests() {
me.onNeedUpdateRequests(peerUpdateRequestsPeerCancelReason)
return nil
}
- piece := &t.pieces[ppReq.Index]
+ piece := t.piece(ppReq.Index.Int())
c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful }))
c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData }))
fmt.Sprintf("extensions: %v", cn.PeerExtensionBytes),
fmt.Sprintf("ltep extensions: %v", cn.PeerExtensionIDs),
fmt.Sprintf("pex: %s", cn.pexStatus()),
+ fmt.Sprintf(
+ "reqq: %d+%v/(%d/%d):%d/%d, flags: %s",
+ cn.requestState.Requests.GetCardinality(),
+ cn.requestState.Cancelled.GetCardinality(),
+ cn.nominalMaxRequests(),
+ cn.PeerMaxRequests,
+ len(cn.peerRequests),
+ localClientReqq,
+ cn.statusFlags(),
+ ),
}
}
func (me *PeerConn) handleCancel(r RequestIndex) {
me.write(makeCancelMessage(me.t.requestIndexToRequest(r)))
-}
-
-func (me *PeerConn) acksCancels() bool {
- return me.remoteRejectsCancels()
+ if me.remoteRejectsCancels() {
+ // Record that we expect to get a cancel ack.
+ if !me.requestState.Cancelled.CheckedAdd(r) {
+ panic("request already cancelled")
+ }
+ }
}
// Whether we should expect a reject message after sending a cancel.
break
}
numPending := maxRequests(current.Requests.GetCardinality() + current.Cancelled.GetCardinality())
- if numPending >= p.peerImpl.nominalMaxRequests() {
+ if numPending >= p.nominalMaxRequests() {
break
}
req := heap.Pop(requestHeap)
}
func (t *Torrent) haveChunk(r Request) (ret bool) {
- // defer func() {
- // log.Println("have chunk", r, ret)
- // }()
if !t.haveInfo() {
return false
}
if t.pieceComplete(pieceIndex(r.Index)) {
return true
}
- p := &t.pieces[r.Index]
+ p := t.piece(int(r.Index))
return !p.pendingChunk(r.ChunkSpec, t.chunkSize)
}
type AddWebSeedsOpt func(*webseed.Client)
+// TODO: Add a webseed http.Client option.
+
// Max concurrent requests to a WebSeed for a given torrent.
func WebSeedTorrentMaxRequests(maxRequests int) AddWebSeedsOpt {
return func(c *webseed.Client) {
hostKey webseedHostKeyHandle
}
-func (me *webseedPeer) checkReceivedChunk(ri RequestIndex) error {
- return nil
-}
-
-func (me *webseedPeer) nominalMaxRequests() maxRequests {
- // TODO: Implement an algorithm that assigns this based on sharing chunks across peers. For now
- // we just allow 2 MiB worth of requests.
- return me.peer.PeerMaxRequests
+func (me *webseedPeer) expectingChunks() bool {
+ return len(me.activeRequests) > 0
}
-func (me *webseedPeer) acksCancels() bool {
- return false
+func (me *webseedPeer) checkReceivedChunk(ri RequestIndex) error {
+ return nil
}
func (me *webseedPeer) numRequests() int {
end: end,
}
ws.activeRequests[&wsReq] = struct{}{}
+ ws.peer.updateExpectingChunks()
panicif.Zero(ws.hostKey)
ws.peer.t.cl.numWebSeedRequests[ws.hostKey]++
ws.slogger().Debug(
torrent.Add("webseed request error count", 1)
// This used to occur only on webseed.ErrTooFast but I think it makes sense to slow down any
// kind of error. Pausing here will starve the available requester slots which slows things
- // down.
+ // down. TODO: I don't think this will help anymore. Need to register a reduced concurrency
+ // available for a host/cost key.
select {
case <-ws.peer.closed.Done():
case <-time.After(time.Duration(rand.Int63n(int64(10 * time.Second)))):
func (ws *webseedPeer) deleteActiveRequest(wr *webseedRequest) {
g.MustDelete(ws.activeRequests, wr)
ws.peer.t.cl.numWebSeedRequests[ws.hostKey]--
+ ws.peer.updateExpectingChunks()
}
func (ws *webseedPeer) spawnRequests() {