"bytes"
"errors"
"fmt"
+ "golang.org/x/time/rate"
"io"
"math/rand"
"net"
- "sort"
"strconv"
"strings"
"sync/atomic"
"time"
"github.com/RoaringBitmap/roaring"
+ "github.com/anacrolix/chansync"
+ . "github.com/anacrolix/generics"
"github.com/anacrolix/log"
"github.com/anacrolix/missinggo/iter"
"github.com/anacrolix/missinggo/v2/bitmap"
"github.com/anacrolix/multiless"
-
- "github.com/anacrolix/chansync"
"github.com/anacrolix/torrent/bencode"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/mse"
pp "github.com/anacrolix/torrent/peer_protocol"
request_strategy "github.com/anacrolix/torrent/request-strategy"
+ "github.com/anacrolix/torrent/typed-roaring"
)
type PeerSource string
String() string
}
-// Since we have to store all the requests in memory, we can't reasonably exceed what would be
-// indexable with the memory space available.
type (
- maxRequests = int
- requestState = request_strategy.PeerRequestState
+ // Since we have to store all the requests in memory, we can't reasonably exceed what could be
+ // indexed with the memory space available.
+ maxRequests = int
)
type Peer struct {
peerImpl
callbacks *Callbacks
- outgoing bool
- Network string
- RemoteAddr PeerRemoteAddr
+ outgoing bool
+ Network string
+ RemoteAddr PeerRemoteAddr
+ bannableAddr Option[bannableAddr]
// True if the connection is operating over MSE obfuscation.
headerEncrypted bool
cryptoMethod mse.CryptoMethod
// Stuff controlled by the local peer.
needRequestUpdate string
- requestState requestState
+ requestState request_strategy.PeerRequestState
updateRequestsTimer *time.Timer
lastRequestUpdate time.Time
peakRequests maxRequests
peerMinPieces pieceIndex
// Pieces we've accepted chunks for from the peer.
peerTouchedPieces map[pieceIndex]struct{}
- peerAllowedFast roaring.Bitmap
+ peerAllowedFast typedRoaring.Bitmap[pieceIndex]
PeerMaxRequests maxRequests // Maximum pending requests the peer allows.
PeerExtensionIDs map[pp.ExtensionName]pp.ExtensionNumber
logger log.Logger
}
+type peerRequests = orderedBitmap[RequestIndex]
+
+func (p *Peer) initRequestState() {
+ p.requestState.Requests = &peerRequests{}
+}
+
// Maintains the state of a BitTorrent-protocol based connection with a peer.
type PeerConn struct {
Peer
return true
}
haveAllowedFastRequests := false
- cn.peerAllowedFast.Iterate(func(i uint32) bool {
- haveAllowedFastRequests = roaringBitmapRangeCardinality(
- &cn.requestState.Requests,
- cn.t.pieceRequestIndexOffset(pieceIndex(i)),
- cn.t.pieceRequestIndexOffset(pieceIndex(i+1)),
+ cn.peerAllowedFast.Iterate(func(i pieceIndex) bool {
+ haveAllowedFastRequests = roaringBitmapRangeCardinality[RequestIndex](
+ cn.requestState.Requests,
+ cn.t.pieceRequestIndexOffset(i),
+ cn.t.pieceRequestIndexOffset(i+1),
) == 0
return !haveAllowedFastRequests
})
}
func (cn *Peer) remoteChokingPiece(piece pieceIndex) bool {
- return cn.peerChoking && !cn.peerAllowedFast.Contains(bitmap.BitIndex(piece))
+ return cn.peerChoking && !cn.peerAllowedFast.Contains(piece)
}
// Returns true if the connection is over IPv6.
// considering only their networking properties. If ok is false, we can't
// decide.
func (l *PeerConn) hasPreferredNetworkOver(r *PeerConn) bool {
- var ml multiLess
- ml.NextBool(l.isPreferredDirection(), r.isPreferredDirection())
- ml.NextBool(!l.utp(), !r.utp())
- ml.NextBool(l.ipv6(), r.ipv6())
+ var ml multiless.Computation
+ ml = ml.Bool(r.isPreferredDirection(), l.isPreferredDirection())
+ ml = ml.Bool(l.utp(), r.utp())
+ ml = ml.Bool(r.ipv6(), l.ipv6())
return ml.Less()
}
return float64(num) / cn.totalExpectingTime().Seconds()
}
-func (cn *Peer) numRequestsByPiece() (ret map[pieceIndex]int) {
- ret = make(map[pieceIndex]int)
- cn.requestState.Requests.Iterate(func(x uint32) bool {
- ret[pieceIndex(x/cn.t.chunksPerRegularPiece())]++
+func (cn *Peer) DownloadRate() float64 {
+ cn.locker().Lock()
+ defer cn.locker().Unlock()
+
+ return cn.downloadRate()
+}
+
+func (cn *Peer) iterContiguousPieceRequests(f func(piece pieceIndex, count int)) {
+ var last Option[pieceIndex]
+ var count int
+ next := func(item Option[pieceIndex]) {
+ if item == last {
+ count++
+ } else {
+ if count != 0 {
+ f(last.Value, count)
+ }
+ last = item
+ count = 1
+ }
+ }
+ cn.requestState.Requests.Iterate(func(requestIndex request_strategy.RequestIndex) bool {
+ next(Some(cn.t.pieceIndexOfRequestIndex(requestIndex)))
return true
})
- return
+ next(None[pieceIndex]())
}
func (cn *Peer) writeStatus(w io.Writer, t *Torrent) {
cn.downloadRate()/(1<<10),
)
fmt.Fprintf(w, " requested pieces:")
- type pieceNumRequestsType struct {
- piece pieceIndex
- numRequests int
- }
- var pieceNumRequests []pieceNumRequestsType
- for piece, count := range cn.numRequestsByPiece() {
- pieceNumRequests = append(pieceNumRequests, pieceNumRequestsType{piece, count})
- }
- sort.Slice(pieceNumRequests, func(i, j int) bool {
- return pieceNumRequests[i].piece < pieceNumRequests[j].piece
+ cn.iterContiguousPieceRequests(func(piece pieceIndex, count int) {
+ fmt.Fprintf(w, " %v(%v)", piece, count)
})
- for _, elem := range pieceNumRequests {
- fmt.Fprintf(w, " %v(%v)", elem.piece, elem.numRequests)
- }
fmt.Fprintf(w, "\n")
}
// The actual value to use as the maximum outbound requests.
func (cn *Peer) nominalMaxRequests() maxRequests {
- return maxRequests(maxInt(1, minInt(cn.PeerMaxRequests, cn.peakRequests*2, maxLocalToRemoteRequests)))
+ return maxInt(1, minInt(cn.PeerMaxRequests, cn.peakRequests*2, maxLocalToRemoteRequests))
}
func (cn *Peer) totalExpectingTime() (ret time.Duration) {
// This function seems to only used by Peer.request. It's all logic checks, so maybe we can no-op it
// when we want to go fast.
func (cn *Peer) shouldRequest(r RequestIndex) error {
- pi := pieceIndex(r / cn.t.chunksPerRegularPiece())
+ pi := cn.t.pieceIndexOfRequestIndex(r)
if cn.requestState.Cancelled.Contains(r) {
return errors.New("request is cancelled and waiting acknowledgement")
}
if cn.t.pieceQueuedForHash(pi) {
panic("piece is queued for hash")
}
- if cn.peerChoking && !cn.peerAllowedFast.Contains(bitmap.BitIndex(pi)) {
+ if cn.peerChoking && !cn.peerAllowedFast.Contains(pi) {
// This could occur if we made a request with the fast extension, and then got choked and
// haven't had the request rejected yet.
if !cn.requestState.Requests.Contains(r) {
cn.validReceiveChunks = make(map[RequestIndex]int)
}
cn.validReceiveChunks[r]++
- cn.t.pendingRequests[r] = cn
- cn.t.lastRequested[r] = time.Now()
+ cn.t.requestState[r] = requestState{
+ peer: cn,
+ when: time.Now(),
+ }
cn.updateExpectingChunks()
ppReq := cn.t.requestIndexToRequest(r)
for _, f := range cn.callbacks.SentRequest {
delete(c.peerRequests, r)
}
-func (c *PeerConn) onReadRequest(r Request) error {
+func (c *PeerConn) maximumPeerRequestChunkLength() (_ Option[int]) {
+ uploadRateLimiter := c.t.cl.config.UploadRateLimiter
+ if uploadRateLimiter.Limit() == rate.Inf {
+ return
+ }
+ return Some(uploadRateLimiter.Burst())
+}
+
+// startFetch is for testing purposes currently.
+func (c *PeerConn) onReadRequest(r Request, startFetch bool) error {
requestedChunkLengths.Add(strconv.FormatUint(r.Length.Uint64(), 10), 1)
if _, ok := c.peerRequests[r]; ok {
torrent.Add("duplicate requests received", 1)
+ if c.fastEnabled() {
+ return errors.New("received duplicate request with fast enabled")
+ }
return nil
}
if c.choking {
// BEP 6 says we may close here if we choose.
return nil
}
+ if opt := c.maximumPeerRequestChunkLength(); opt.Ok && int(r.Length) > opt.Value {
+ err := fmt.Errorf("peer requested chunk too long (%v)", r.Length)
+ c.logger.Levelf(log.Warning, err.Error())
+ if c.fastEnabled() {
+ c.reject(r)
+ return nil
+ } else {
+ return err
+ }
+ }
if !c.t.havePiece(pieceIndex(r.Index)) {
- // This isn't necessarily them screwing up. We can drop pieces
- // from our storage, and can't communicate this to peers
- // except by reconnecting.
+ // TODO: Tell the peer we don't have the piece, and reject this request.
requestsReceivedForMissingPieces.Add(1)
return fmt.Errorf("peer requested piece we don't have: %v", r.Index.Int())
}
}
value := &peerRequestState{}
c.peerRequests[r] = value
- go c.peerRequestDataReader(r, value)
+ if startFetch {
+ // TODO: Limit peer request data read concurrency.
+ go c.peerRequestDataReader(r, value)
+ }
return nil
}
}
torrent.Add("peer request data read successes", 1)
prs.data = b
+ // This might be required for the error case too (#752 and #753).
c.tickleWriter()
}
}
}
func (c *PeerConn) logProtocolBehaviour(level log.Level, format string, arg ...interface{}) {
- c.logger.WithLevel(level).WithContextText(fmt.Sprintf(
+ c.logger.WithContextText(fmt.Sprintf(
"peer id %q, ext v %q", c.PeerID, c.PeerClientName.Load(),
- )).SkipCallers(1).Printf(format, arg...)
+ )).SkipCallers(1).Levelf(level, format, arg...)
}
// Processes incoming BitTorrent wire-protocol messages. The client lock is held upon entry and
decoder := pp.Decoder{
R: bufio.NewReaderSize(c.r, 1<<17),
- MaxLength: 256 * 1024,
+ MaxLength: 4 * pp.Integer(max(int64(t.chunkSize), defaultChunkSize)),
Pool: &t.chunkPool,
}
for {
break
}
if !c.fastEnabled() {
- if !c.deleteAllRequests().IsEmpty() {
- c.t.iterPeers(func(p *Peer) {
- if p.isLowOnRequests() {
- p.updateRequests("choked by non-fast PeerConn")
- }
- })
- }
+ c.deleteAllRequests("choked by non-fast PeerConn")
} else {
// We don't decrement pending requests here, let's wait for the peer to either
// reject or satisfy the outstanding requests. Additionally, some peers may unchoke
}
c.peerChoking = false
preservedCount := 0
- c.requestState.Requests.Iterate(func(x uint32) bool {
- if !c.peerAllowedFast.Contains(x / c.t.chunksPerRegularPiece()) {
+ c.requestState.Requests.Iterate(func(x RequestIndex) bool {
+ if !c.peerAllowedFast.Contains(c.t.pieceIndexOfRequestIndex(x)) {
preservedCount++
}
return true
if preservedCount != 0 {
// TODO: Yes this is a debug log but I'm not happy with the state of the logging lib
// right now.
- c.logger.WithLevel(log.Debug).Printf(
+ c.logger.Levelf(log.Debug,
"%v requests were preserved while being choked (fast=%v)",
preservedCount,
c.fastEnabled())
+
torrent.Add("requestsPreservedThroughChoking", int64(preservedCount))
}
if !c.t._pendingPieces.IsEmpty() {
err = c.peerSentBitfield(msg.Bitfield)
case pp.Request:
r := newRequestFromMessage(&msg)
- err = c.onReadRequest(r)
+ err = c.onReadRequest(r, true)
case pp.Piece:
c.doChunkReadStats(int64(len(msg.Piece)))
err = c.receiveChunk(&msg)
})
case pp.Suggest:
torrent.Add("suggests received", 1)
- log.Fmsg("peer suggested piece %d", msg.Index).AddValues(c, msg.Index).SetLevel(log.Debug).Log(c.t.logger)
+ log.Fmsg("peer suggested piece %d", msg.Index).AddValues(c, msg.Index).LogLevel(log.Debug, c.t.logger)
c.updateRequests("suggested")
case pp.HaveAll:
err = c.onPeerSentHaveAll()
case pp.Reject:
req := newRequestFromMessage(&msg)
if !c.remoteRejectedRequest(c.t.requestIndexFromRequest(req)) {
- log.Printf("received invalid reject [request=%v, peer=%v]", req, c)
+ c.logger.Printf("received invalid reject [request=%v, peer=%v]", req, c)
err = fmt.Errorf("received invalid reject [request=%v]", req)
}
case pp.AllowedFast:
torrent.Add("allowed fasts received", 1)
- log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c).SetLevel(log.Debug).Log(c.t.logger)
+ log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c).LogLevel(log.Debug, c.t.logger)
c.updateRequests("PeerConn.mainReadLoop allowed fast")
case pp.Extended:
err = c.onReadExtendedMsg(msg.ExtendedID, msg.ExtendedPayload)
ppReq := newRequestFromMessage(msg)
req := c.t.requestIndexFromRequest(ppReq)
+ t := c.t
+
+ if c.bannableAddr.Ok {
+ t.smartBanCache.RecordBlock(c.bannableAddr.Value, req, msg.Piece)
+ }
if c.peerChoking {
chunksReceived.Add("while choked", 1)
}
c.decExpectedChunkReceive(req)
- if c.peerChoking && c.peerAllowedFast.Contains(bitmap.BitIndex(ppReq.Index)) {
+ if c.peerChoking && c.peerAllowedFast.Contains(pieceIndex(ppReq.Index)) {
chunksReceived.Add("due to allowed fast", 1)
}
}
}
- t := c.t
cl := t.cl
// Do we actually want this chunk?
piece.unpendChunkIndex(chunkIndexFromChunkSpec(ppReq.ChunkSpec, t.chunkSize))
// Cancel pending requests for this chunk from *other* peers.
- if p := t.pendingRequests[req]; p != nil {
+ if p := t.requestingPeer(req); p != nil {
if p == c {
panic("should not be pending request from conn that just received it")
}
cn.t.dropConnection(cn)
}
+func (cn *PeerConn) ban() {
+ cn.t.cl.banPeerIP(cn.remoteIp())
+}
+
func (cn *Peer) netGoodPiecesDirtied() int64 {
return cn._stats.PiecesDirtiedGood.Int64() - cn._stats.PiecesDirtiedBad.Int64()
}
if c.t.requestingPeer(r) != c {
panic("only one peer should have a given request at a time")
}
- delete(c.t.pendingRequests, r)
- delete(c.t.lastRequested, r)
+ delete(c.t.requestState, r)
// c.t.iterPeers(func(p *Peer) {
// if p.isLowOnRequests() {
// p.updateRequests("Peer.deleteRequest")
return true
}
-func (c *Peer) deleteAllRequests() (deleted *roaring.Bitmap) {
- deleted = c.requestState.Requests.Clone()
- deleted.Iterate(func(x uint32) bool {
+func (c *Peer) deleteAllRequests(reason string) {
+ if c.requestState.Requests.IsEmpty() {
+ return
+ }
+ c.requestState.Requests.IterateSnapshot(func(x RequestIndex) bool {
if !c.deleteRequest(x) {
panic("request should exist")
}
return true
})
c.assertNoRequests()
+ c.t.iterPeers(func(p *Peer) {
+ if p.isLowOnRequests() {
+ p.updateRequests(reason)
+ }
+ })
return
}
}
}
-func (c *Peer) cancelAllRequests() (cancelled *roaring.Bitmap) {
- cancelled = c.requestState.Requests.Clone()
- cancelled.Iterate(func(x uint32) bool {
+func (c *Peer) cancelAllRequests() {
+ c.requestState.Requests.IterateSnapshot(func(x RequestIndex) bool {
c.cancel(x)
return true
})