package torrent
import (
- "errors"
"fmt"
"io"
"log/slog"
"github.com/anacrolix/multiless"
"github.com/anacrolix/torrent/internal/alloclim"
- requestStrategy "github.com/anacrolix/torrent/internal/request-strategy"
"github.com/anacrolix/torrent/mse"
pp "github.com/anacrolix/torrent/peer_protocol"
typedRoaring "github.com/anacrolix/torrent/typed-roaring"
return
}
-func (p *PeerConn) initRequestState() {
- p.requestState.Requests = &peerRequests{}
-}
-
func (cn *Peer) updateExpectingChunks() {
if cn.peerImpl.expectingChunks() {
if cn.lastStartedExpectingToReceiveChunks.IsZero() {
}
}
-func (cn *PeerConn) expectingChunks() bool {
- if cn.requestState.Requests.IsEmpty() {
- return false
- }
- if !cn.requestState.Interested {
- return false
- }
- if !cn.peerChoking {
- return true
- }
- haveAllowedFastRequests := false
- cn.peerAllowedFast.Iterate(func(i pieceIndex) bool {
- haveAllowedFastRequests = roaringBitmapRangeCardinality[RequestIndex](
- cn.requestState.Requests,
- cn.t.pieceRequestIndexBegin(i),
- cn.t.pieceRequestIndexBegin(i+1),
- ) == 0
- return !haveAllowedFastRequests
- })
- return haveAllowedFastRequests
-}
-
-func (cn *PeerConn) cumInterest() time.Duration {
- ret := cn.priorInterest
- if cn.requestState.Interested {
- ret += time.Since(cn.lastBecameInterested)
- }
- return ret
-}
-
func (cn *Peer) locker() *lockWithDeferreds {
return cn.t.cl.locker()
}
-func (cn *PeerConn) supportsExtension(ext pp.ExtensionName) bool {
- _, ok := cn.PeerExtensionIDs[ext]
- return ok
-}
-
// The best guess at number of pieces in the torrent for this peer.
func (cn *Peer) bestPeerNumPieces() pieceIndex {
if cn.t.haveInfo() {
return fmt.Sprintf("%.2fs ago", time.Since(t).Seconds())
}
-// Inspired by https://github.com/transmission/transmission/wiki/Peer-Status-Text.
-func (cn *PeerConn) statusFlags() (ret string) {
- c := func(b byte) {
- ret += string([]byte{b})
- }
- if cn.requestState.Interested {
- c('i')
- }
- if cn.choking {
- c('c')
- }
- c(':')
- ret += cn.connectionFlags()
- c(':')
- if cn.peerInterested {
- c('i')
- }
- if cn.peerChoking {
- c('c')
- }
- return
-}
-
func (cn *Peer) downloadRate() float64 {
num := cn._stats.BytesReadUsefulData.Int64()
if num == 0 {
return p.Stats().DownloadRate
}
-func (cn *PeerConn) iterContiguousPieceRequests(f func(piece pieceIndex, count int)) {
- var last Option[pieceIndex]
- var count int
- next := func(item Option[pieceIndex]) {
- if item == last {
- count++
- } else {
- if count != 0 {
- f(last.Value, count)
- }
- last = item
- count = 1
- }
- }
- cn.requestState.Requests.Iterate(func(requestIndex requestStrategy.RequestIndex) bool {
- next(Some(cn.t.pieceIndexOfRequestIndex(requestIndex)))
- return true
- })
- next(None[pieceIndex]())
-}
-
-func (cn *PeerConn) peerImplWriteStatus(w io.Writer) {
- prio, err := cn.peerPriority()
- prioStr := fmt.Sprintf("%08x", prio)
- if err != nil {
- prioStr += ": " + err.Error()
- }
- fmt.Fprintf(w, "bep40-prio: %v\n", prioStr)
- fmt.Fprintf(w, "last msg: %s, connected: %s, last helpful: %s, itime: %s, etime: %s\n",
- eventAgeString(cn.lastMessageReceived),
- eventAgeString(cn.completedHandshake),
- eventAgeString(cn.lastHelpful()),
- cn.cumInterest(),
- cn.totalExpectingTime(),
- )
- fmt.Fprintf(w,
- "%s completed, chunks uploaded: %v\n",
- cn.completedString(),
- &cn._stats.ChunksWritten,
- )
- fmt.Fprintf(w, "requested pieces:")
- cn.iterContiguousPieceRequests(func(piece pieceIndex, count int) {
- fmt.Fprintf(w, " %v(%v)", piece, count)
- })
-}
-
func (cn *Peer) writeStatus(w io.Writer) {
// \t isn't preserved in <pre> blocks?
if cn.closed.IsSet() {
return
}
-func (cn *PeerConn) setInterested(interested bool) bool {
- if cn.requestState.Interested == interested {
- return true
- }
- cn.requestState.Interested = interested
- if interested {
- cn.lastBecameInterested = time.Now()
- } else if !cn.lastBecameInterested.IsZero() {
- cn.priorInterest += time.Since(cn.lastBecameInterested)
- }
- cn.updateExpectingChunks()
- // log.Printf("%p: setting interest: %v", cn, interested)
- return cn.writeInterested(interested)
-}
-
// The function takes a message to be sent, and returns true if more messages
// are okay.
type messageWriter func(pp.Message) bool
-// This function seems to only used by Peer.request. It's all logic checks, so maybe we can no-op it
-// when we want to go fast.
-func (cn *PeerConn) shouldRequest(r RequestIndex) error {
- err := cn.t.checkValidReceiveChunk(cn.t.requestIndexToRequest(r))
- if err != nil {
- return err
- }
- pi := cn.t.pieceIndexOfRequestIndex(r)
- if cn.requestState.Cancelled.Contains(r) {
- return errors.New("request is cancelled and waiting acknowledgement")
- }
- if !cn.peerHasPiece(pi) {
- return errors.New("requesting piece peer doesn't have")
- }
- if !cn.t.peerIsActive(cn.peerPtr()) {
- panic("requesting but not in active conns")
- }
- if cn.closed.IsSet() {
- panic("requesting when connection is closed")
- }
- if cn.t.hashingPiece(pi) {
- panic("piece is being hashed")
- }
- p := cn.t.piece(pi)
- if p.marking {
- panic("piece is being marked")
- }
- if cn.t.pieceQueuedForHash(pi) {
- panic("piece is queued for hash")
- }
- if cn.peerChoking && !cn.peerAllowedFast.Contains(pi) {
- // This could occur if we made a request with the fast extension, and then got choked and
- // haven't had the request rejected yet.
- if !cn.requestState.Requests.Contains(r) {
- panic("peer choking and piece not allowed fast")
- }
- }
- return nil
-}
-
-func (cn *PeerConn) mustRequest(r RequestIndex) bool {
- more, err := cn.request(r)
- if err != nil {
- panic(err)
- }
- return more
-}
-
-func (cn *PeerConn) request(r RequestIndex) (more bool, err error) {
- if err := cn.shouldRequest(r); err != nil {
- panic(err)
- }
- if cn.requestState.Requests.Contains(r) {
- return true, nil
- }
- if maxRequests(cn.requestState.Requests.GetCardinality()) >= cn.nominalMaxRequests() {
- return true, errors.New("too many outstanding requests")
- }
- cn.requestState.Requests.Add(r)
- if cn.validReceiveChunks == nil {
- cn.validReceiveChunks = make(map[RequestIndex]int)
- }
- cn.validReceiveChunks[r]++
- cn.t.requestState[r] = requestState{
- peer: cn,
- when: time.Now(),
- }
- cn.updateExpectingChunks()
- ppReq := cn.t.requestIndexToRequest(r)
- for _, f := range cn.callbacks.SentRequest {
- f(PeerRequestEvent{cn.peerPtr(), ppReq})
- }
- return cn._request(ppReq), nil
-}
-
-func (me *PeerConn) cancel(r RequestIndex) {
- if !me.deleteRequest(r) {
- panic("request not existing should have been guarded")
- }
- me.handleCancel(r)
- me.decPeakRequests()
- if me.isLowOnRequests() {
- me.onNeedUpdateRequests(peerUpdateRequestsPeerCancelReason)
- }
-}
-
-// Sets a reason to update requests, and if there wasn't already one, handle it.
-func (cn *PeerConn) onNeedUpdateRequests(reason updateRequestReason) {
- if cn.needRequestUpdate != "" {
- return
- }
- cn.needRequestUpdate = reason
- // Run this before the Client lock is released.
- cn.locker().DeferUniqueUnaryFunc(cn, cn.handleOnNeedUpdateRequests)
-}
-
// Emits the indices in the Bitmaps bms in order, never repeating any index.
// skip is mutated during execution, and its initial values will never be
// emitted.
}
}
-// Returns true if it was valid to reject the request.
-func (c *PeerConn) remoteRejectedRequest(r RequestIndex) bool {
- if c.deleteRequest(r) {
- c.decPeakRequests()
- } else if !c.requestState.Cancelled.CheckedRemove(r) {
- // The request was already cancelled.
- return false
- }
- if c.isLowOnRequests() {
- c.onNeedUpdateRequests(peerUpdateRequestsRemoteRejectReason)
- }
- c.decExpectedChunkReceive(r)
- return true
-}
-
-func (c *PeerConn) decExpectedChunkReceive(r RequestIndex) {
- count := c.validReceiveChunks[r]
- if count == 1 {
- delete(c.validReceiveChunks, r)
- } else if count > 1 {
- c.validReceiveChunks[r] = count - 1
- } else {
- panic(r)
- }
-}
-
func (c *Peer) doChunkReadStats(size int64) {
c.allStats(func(cs *ConnStats) { cs.receivedChunk(size) })
}
return c.peerPieces().Intersects(&c.t._pendingPieces)
}
-// Returns true if an outstanding request is removed. Cancelled requests should be handled
-// separately.
-func (c *PeerConn) deleteRequest(r RequestIndex) bool {
- if !c.requestState.Requests.CheckedRemove(r) {
- return false
- }
- for _, f := range c.callbacks.DeletedRequest {
- f(PeerRequestEvent{c.peerPtr(), c.t.requestIndexToRequest(r)})
- }
- c.updateExpectingChunks()
- // TODO: Can't this happen if a request is stolen?
- if c.t.requestingPeer(r) != c {
- panic("only one peer should have a given request at a time")
- }
- delete(c.t.requestState, r)
- // c.t.iterPeers(func(p *Peer) {
- // if p.isLowOnRequests() {
- // p.onNeedUpdateRequests("Peer.deleteRequest")
- // }
- // })
- return true
-}
-
-func (c *PeerConn) deleteAllRequests(reason updateRequestReason) {
- if c.requestState.Requests.IsEmpty() {
- return
- }
- c.requestState.Requests.IterateSnapshot(func(x RequestIndex) bool {
- if !c.deleteRequest(x) {
- panic("request should exist")
- }
- return true
- })
- c.assertNoRequests()
- c.t.iterPeers(func(p *Peer) {
- if p.isLowOnRequests() {
- p.onNeedUpdateRequests(reason)
- }
- })
-}
-
-func (c *PeerConn) assertNoRequests() {
- if !c.requestState.Requests.IsEmpty() {
- panic(c.requestState.Requests.GetCardinality())
- }
-}
-
-func (c *PeerConn) cancelAllRequests() {
- c.requestState.Requests.IterateSnapshot(func(x RequestIndex) bool {
- c.cancel(x)
- return true
- })
- c.assertNoRequests()
-}
-
func (c *Peer) peerPriority() (peerPriority, error) {
return bep40Priority(c.remoteIpPort(), c.localPublicAddr)
}
return pc, ok
}
-func (p *PeerConn) uncancelledRequests() uint64 {
- return p.requestState.Requests.GetCardinality()
-}
-
type peerLocalPublicAddr = IpPort
-func (p *PeerConn) isLowOnRequests() bool {
- return p.requestState.Requests.IsEmpty() && p.requestState.Cancelled.IsEmpty()
-}
-
func (p *Peer) decPeakRequests() {
// // This can occur when peak requests are altered by the update request timer to be lower than
// // the actual number of outstanding requests. Let's let it go negative and see what happens. I
"github.com/anacrolix/log"
"github.com/anacrolix/missinggo/v2/bitmap"
"github.com/anacrolix/multiless"
- requestStrategy "github.com/anacrolix/torrent/internal/request-strategy"
+
"golang.org/x/time/rate"
"github.com/anacrolix/torrent/bencode"
"github.com/anacrolix/torrent/internal/alloclim"
+ requestStrategy "github.com/anacrolix/torrent/internal/request-strategy"
+
"github.com/anacrolix/torrent/merkle"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/mse"
me.protocolLogger = me.logger.WithNames(protocolLoggingName)
}
+// Methods moved from peer.go (in their original order):
+
+func (p *PeerConn) initRequestState() {
+ p.requestState.Requests = &peerRequests{}
+}
+
+func (cn *PeerConn) expectingChunks() bool {
+ if cn.requestState.Requests.IsEmpty() {
+ return false
+ }
+ if !cn.requestState.Interested {
+ return false
+ }
+ if !cn.peerChoking {
+ return true
+ }
+ haveAllowedFastRequests := false
+ cn.peerAllowedFast.Iterate(func(i pieceIndex) bool {
+ haveAllowedFastRequests = roaringBitmapRangeCardinality[RequestIndex](
+ cn.requestState.Requests,
+ cn.t.pieceRequestIndexBegin(i),
+ cn.t.pieceRequestIndexBegin(i+1),
+ ) == 0
+ return !haveAllowedFastRequests
+ })
+ return haveAllowedFastRequests
+}
+
+func (cn *PeerConn) cumInterest() time.Duration {
+ ret := cn.priorInterest
+ if cn.requestState.Interested {
+ ret += time.Since(cn.lastBecameInterested)
+ }
+ return ret
+}
+
+func (cn *PeerConn) supportsExtension(ext pp.ExtensionName) bool {
+ _, ok := cn.PeerExtensionIDs[ext]
+ return ok
+}
+
+// Inspired by https://github.com/transmission/transmission/wiki/Peer-Status-Text.
+func (cn *PeerConn) statusFlags() (ret string) {
+ c := func(b byte) {
+ ret += string([]byte{b})
+ }
+ if cn.requestState.Interested {
+ c('i')
+ }
+ if cn.choking {
+ c('c')
+ }
+ c(':')
+ ret += cn.connectionFlags()
+ c(':')
+ if cn.peerInterested {
+ c('i')
+ }
+ if cn.peerChoking {
+ c('c')
+ }
+ return
+}
+
+func (cn *PeerConn) iterContiguousPieceRequests(f func(piece pieceIndex, count int)) {
+ var last Option[pieceIndex]
+ var count int
+ next := func(item Option[pieceIndex]) {
+ if item == last {
+ count++
+ } else {
+ if count != 0 {
+ f(last.Value, count)
+ }
+ last = item
+ count = 1
+ }
+ }
+ cn.requestState.Requests.Iterate(func(requestIndex requestStrategy.RequestIndex) bool {
+ next(Some(cn.t.pieceIndexOfRequestIndex(requestIndex)))
+ return true
+ })
+ next(None[pieceIndex]())
+}
+
+func (cn *PeerConn) peerImplWriteStatus(w io.Writer) {
+ prio, err := cn.peerPriority()
+ prioStr := fmt.Sprintf("%08x", prio)
+ if err != nil {
+ prioStr += ": " + err.Error()
+ }
+ fmt.Fprintf(w, "bep40-prio: %v\n", prioStr)
+ fmt.Fprintf(w, "last msg: %s, connected: %s, last helpful: %s, itime: %s, etime: %s\n",
+ eventAgeString(cn.lastMessageReceived),
+ eventAgeString(cn.completedHandshake),
+ eventAgeString(cn.lastHelpful()),
+ cn.cumInterest(),
+ cn.totalExpectingTime(),
+ )
+ fmt.Fprintf(w,
+ "%s completed, chunks uploaded: %v\n",
+ cn.completedString(),
+ &cn._stats.ChunksWritten,
+ )
+ fmt.Fprintf(w, "requested pieces:")
+ cn.iterContiguousPieceRequests(func(piece pieceIndex, count int) {
+ fmt.Fprintf(w, " %v(%v)", piece, count)
+ })
+}
+
+func (cn *PeerConn) setInterested(interested bool) bool {
+ if cn.requestState.Interested == interested {
+ return true
+ }
+ cn.requestState.Interested = interested
+ if interested {
+ cn.lastBecameInterested = time.Now()
+ } else if !cn.lastBecameInterested.IsZero() {
+ cn.priorInterest += time.Since(cn.lastBecameInterested)
+ }
+ cn.updateExpectingChunks()
+ return cn.writeInterested(interested)
+}
+
+// This function seems to only used by Peer.request. It's all logic checks, so maybe we can no-op it
+// when we want to go fast.
+func (cn *PeerConn) shouldRequest(r RequestIndex) error {
+ err := cn.t.checkValidReceiveChunk(cn.t.requestIndexToRequest(r))
+ if err != nil {
+ return err
+ }
+ pi := cn.t.pieceIndexOfRequestIndex(r)
+ if cn.requestState.Cancelled.Contains(r) {
+ return errors.New("request is cancelled and waiting acknowledgement")
+ }
+ if !cn.peerHasPiece(pi) {
+ return errors.New("requesting piece peer doesn't have")
+ }
+ if !cn.t.peerIsActive(cn.peerPtr()) {
+ panic("requesting but not in active conns")
+ }
+ if cn.closed.IsSet() {
+ panic("requesting when connection is closed")
+ }
+ if cn.t.hashingPiece(pi) {
+ panic("piece is being hashed")
+ }
+ p := cn.t.piece(pi)
+ if p.marking {
+ panic("piece is being marked")
+ }
+ if cn.t.pieceQueuedForHash(pi) {
+ panic("piece is queued for hash")
+ }
+ if cn.peerChoking && !cn.peerAllowedFast.Contains(pi) {
+ // This could occur if we made a request with the fast extension, and then got choked and
+ // haven't had the request rejected yet.
+ if !cn.requestState.Requests.Contains(r) {
+ panic("peer choking and piece not allowed fast")
+ }
+ }
+ return nil
+}
+
+func (cn *PeerConn) mustRequest(r RequestIndex) bool {
+ more, err := cn.request(r)
+ if err != nil {
+ panic(err)
+ }
+ return more
+}
+
+func (cn *PeerConn) request(r RequestIndex) (more bool, err error) {
+ if err := cn.shouldRequest(r); err != nil {
+ panic(err)
+ }
+ if cn.requestState.Requests.Contains(r) {
+ return true, nil
+ }
+ if maxRequests(cn.requestState.Requests.GetCardinality()) >= cn.nominalMaxRequests() {
+ return true, errors.New("too many outstanding requests")
+ }
+ cn.requestState.Requests.Add(r)
+ if cn.validReceiveChunks == nil {
+ cn.validReceiveChunks = make(map[RequestIndex]int)
+ }
+ cn.validReceiveChunks[r]++
+ cn.t.requestState[r] = requestState{
+ peer: cn,
+ when: time.Now(),
+ }
+ cn.updateExpectingChunks()
+ ppReq := cn.t.requestIndexToRequest(r)
+ for _, f := range cn.callbacks.SentRequest {
+ f(PeerRequestEvent{cn.peerPtr(), ppReq})
+ }
+ return cn._request(ppReq), nil
+}
+
+func (me *PeerConn) cancel(r RequestIndex) {
+ if !me.deleteRequest(r) {
+ panic("request not existing should have been guarded")
+ }
+ me.handleCancel(r)
+ me.decPeakRequests()
+ if me.isLowOnRequests() {
+ me.onNeedUpdateRequests(peerUpdateRequestsPeerCancelReason)
+ }
+}
+
+// Sets a reason to update requests, and if there wasn't already one, handle it.
+func (cn *PeerConn) onNeedUpdateRequests(reason updateRequestReason) {
+ if cn.needRequestUpdate != "" {
+ return
+ }
+ cn.needRequestUpdate = reason
+ // Run this before the Client lock is released.
+ cn.locker().DeferUniqueUnaryFunc(cn, cn.handleOnNeedUpdateRequests)
+}
+
+// Returns true if it was valid to reject the request.
+func (c *PeerConn) remoteRejectedRequest(r RequestIndex) bool {
+ if c.deleteRequest(r) {
+ c.decPeakRequests()
+ } else if !c.requestState.Cancelled.CheckedRemove(r) {
+ // The request was already cancelled.
+ return false
+ }
+ if c.isLowOnRequests() {
+ c.onNeedUpdateRequests(peerUpdateRequestsRemoteRejectReason)
+ }
+ c.decExpectedChunkReceive(r)
+ return true
+}
+
+func (c *PeerConn) decExpectedChunkReceive(r RequestIndex) {
+ count := c.validReceiveChunks[r]
+ if count == 1 {
+ delete(c.validReceiveChunks, r)
+ } else if count > 1 {
+ c.validReceiveChunks[r] = count - 1
+ } else {
+ panic(r)
+ }
+}
+
+// Returns true if an outstanding request is removed. Cancelled requests should be handled
+// separately.
+func (c *PeerConn) deleteRequest(r RequestIndex) bool {
+ if !c.requestState.Requests.CheckedRemove(r) {
+ return false
+ }
+ for _, f := range c.callbacks.DeletedRequest {
+ f(PeerRequestEvent{c.peerPtr(), c.t.requestIndexToRequest(r)})
+ }
+ c.updateExpectingChunks()
+ // TODO: Can't this happen if a request is stolen?
+ if c.t.requestingPeer(r) != c {
+ panic("only one peer should have a given request at a time")
+ }
+ delete(c.t.requestState, r)
+ // c.t.iterPeers(func(p *Peer) {
+ // if p.isLowOnRequests() {
+ // p.onNeedUpdateRequests("Peer.deleteRequest")
+ // }
+ // })
+ return true
+}
+
+func (c *PeerConn) deleteAllRequests(reason updateRequestReason) {
+ if c.requestState.Requests.IsEmpty() {
+ return
+ }
+ c.requestState.Requests.IterateSnapshot(func(x RequestIndex) bool {
+ if !c.deleteRequest(x) {
+ panic("request should exist")
+ }
+ return true
+ })
+ c.assertNoRequests()
+ c.t.iterPeers(func(p *Peer) {
+ if p.isLowOnRequests() {
+ p.onNeedUpdateRequests(reason)
+ }
+ })
+}
+
+func (c *PeerConn) assertNoRequests() {
+ if !c.requestState.Requests.IsEmpty() {
+ panic(c.requestState.Requests.GetCardinality())
+ }
+}
+
+func (c *PeerConn) cancelAllRequests() {
+ c.requestState.Requests.IterateSnapshot(func(x RequestIndex) bool {
+ c.cancel(x)
+ return true
+ })
+ c.assertNoRequests()
+}
+
+func (p *PeerConn) uncancelledRequests() uint64 {
+ return p.requestState.Requests.GetCardinality()
+}
+
+func (p *PeerConn) isLowOnRequests() bool {
+ return p.requestState.Requests.IsEmpty() && p.requestState.Cancelled.IsEmpty()
+}
+
func (c *PeerConn) checkReceivedChunk(req RequestIndex, msg *pp.Message, ppReq Request) (intended bool, err error) {
if c.validReceiveChunks[req] <= 0 {
ChunksReceived.Add("unexpected", 1)