"github.com/anacrolix/missinggo/iter"
"github.com/anacrolix/missinggo/v2/bitmap"
"github.com/anacrolix/multiless"
+ "golang.org/x/time/rate"
+
"github.com/anacrolix/torrent/bencode"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/mse"
peerImpl
callbacks *Callbacks
- outgoing bool
- Network string
- RemoteAddr PeerRemoteAddr
- bannableAddr Option[bannableAddr]
+ outgoing bool
+ Network string
+ RemoteAddr PeerRemoteAddr
+ // The local address as observed by the remote peer. WebRTC seems to get this right without needing hints from the
+ // config.
+ localPublicAddr peerLocalPublicAddr
+ bannableAddr Option[bannableAddr]
// True if the connection is operating over MSE obfuscation.
headerEncrypted bool
cryptoMethod mse.CryptoMethod
peerSentHaveAll bool
}
-func (cn *PeerConn) connStatusString() string {
- return fmt.Sprintf("%+-55q %s %s", cn.PeerID, cn.PeerExtensionBytes, cn.connString)
+func (cn *PeerConn) peerImplStatusLines() []string {
+ return []string{fmt.Sprintf("%+-55q %s %s", cn.PeerID, cn.PeerExtensionBytes, cn.connString)}
}
func (cn *Peer) updateExpectingChunks() {
return fmt.Sprintf("%d/%d", have, cn.bestPeerNumPieces())
}
+func (cn *Peer) CompletedString() string {
+ return cn.completedString()
+}
+
func (cn *PeerConn) onGotInfo(info *metainfo.Info) {
cn.setNumPieces(info.NumPieces())
}
return
}
+func (cn *Peer) StatusFlags() string {
+ return cn.statusFlags()
+}
+
func (cn *Peer) downloadRate() float64 {
num := cn._stats.BytesReadUsefulData.Int64()
if num == 0 {
return float64(num) / cn.totalExpectingTime().Seconds()
}
+func (cn *Peer) DownloadRate() float64 {
+ cn.locker().RLock()
+ defer cn.locker().RUnlock()
+
+ return cn.downloadRate()
+}
+
+func (cn *Peer) UploadRate() float64 {
+ cn.locker().RLock()
+ defer cn.locker().RUnlock()
+ num := cn._stats.BytesWrittenData.Int64()
+ if num == 0 {
+ return 0
+ }
+ return float64(num) / time.Now().Sub(cn.completedHandshake).Seconds()
+}
+
+
func (cn *Peer) iterContiguousPieceRequests(f func(piece pieceIndex, count int)) {
var last Option[pieceIndex]
var count int
count++
} else {
if count != 0 {
- f(last.Value(), count)
+ f(last.Value, count)
}
last = item
count = 1
if cn.closed.IsSet() {
fmt.Fprint(w, "CLOSED: ")
}
- fmt.Fprintln(w, cn.connStatusString())
+ fmt.Fprintln(w, strings.Join(cn.peerImplStatusLines(), "\n"))
prio, err := cn.peerPriority()
prioStr := fmt.Sprintf("%08x", prio)
if err != nil {
prioStr += ": " + err.Error()
}
- fmt.Fprintf(w, " bep40-prio: %v\n", prioStr)
- fmt.Fprintf(w, " last msg: %s, connected: %s, last helpful: %s, itime: %s, etime: %s\n",
+ fmt.Fprintf(w, "bep40-prio: %v\n", prioStr)
+ fmt.Fprintf(w, "last msg: %s, connected: %s, last helpful: %s, itime: %s, etime: %s\n",
eventAgeString(cn.lastMessageReceived),
eventAgeString(cn.completedHandshake),
eventAgeString(cn.lastHelpful()),
cn.totalExpectingTime(),
)
fmt.Fprintf(w,
- " %s completed, %d pieces touched, good chunks: %v/%v:%v reqq: %d+%v/(%d/%d):%d/%d, flags: %s, dr: %.1f KiB/s\n",
+ "%s completed, %d pieces touched, good chunks: %v/%v:%v reqq: %d+%v/(%d/%d):%d/%d, flags: %s, dr: %.1f KiB/s\n",
cn.completedString(),
len(cn.peerTouchedPieces),
&cn._stats.ChunksReadUseful,
cn.statusFlags(),
cn.downloadRate()/(1<<10),
)
- fmt.Fprintf(w, " requested pieces:")
+ fmt.Fprintf(w, "requested pieces:")
cn.iterContiguousPieceRequests(func(piece pieceIndex, count int) {
fmt.Fprintf(w, " %v(%v)", piece, count)
})
// The actual value to use as the maximum outbound requests.
func (cn *Peer) nominalMaxRequests() maxRequests {
- return maxRequests(maxInt(1, minInt(cn.PeerMaxRequests, cn.peakRequests*2, maxLocalToRemoteRequests)))
+ return maxInt(1, minInt(cn.PeerMaxRequests, cn.peakRequests*2, maxLocalToRemoteRequests))
}
func (cn *Peer) totalExpectingTime() (ret time.Duration) {
// This function seems to only used by Peer.request. It's all logic checks, so maybe we can no-op it
// when we want to go fast.
func (cn *Peer) shouldRequest(r RequestIndex) error {
+ err := cn.t.checkValidReceiveChunk(cn.t.requestIndexToRequest(r))
+ if err != nil {
+ return err
+ }
pi := cn.t.pieceIndexOfRequestIndex(r)
if cn.requestState.Cancelled.Contains(r) {
return errors.New("request is cancelled and waiting acknowledgement")
delete(c.peerRequests, r)
}
-func (c *PeerConn) onReadRequest(r Request) error {
+func (c *PeerConn) maximumPeerRequestChunkLength() (_ Option[int]) {
+ uploadRateLimiter := c.t.cl.config.UploadRateLimiter
+ if uploadRateLimiter.Limit() == rate.Inf {
+ return
+ }
+ return Some(uploadRateLimiter.Burst())
+}
+
+// startFetch is for testing purposes currently.
+func (c *PeerConn) onReadRequest(r Request, startFetch bool) error {
requestedChunkLengths.Add(strconv.FormatUint(r.Length.Uint64(), 10), 1)
if _, ok := c.peerRequests[r]; ok {
torrent.Add("duplicate requests received", 1)
+ if c.fastEnabled() {
+ return errors.New("received duplicate request with fast enabled")
+ }
return nil
}
if c.choking {
// BEP 6 says we may close here if we choose.
return nil
}
+ if opt := c.maximumPeerRequestChunkLength(); opt.Ok && int(r.Length) > opt.Value {
+ err := fmt.Errorf("peer requested chunk too long (%v)", r.Length)
+ c.logger.Levelf(log.Warning, err.Error())
+ if c.fastEnabled() {
+ c.reject(r)
+ return nil
+ } else {
+ return err
+ }
+ }
if !c.t.havePiece(pieceIndex(r.Index)) {
- // This isn't necessarily them screwing up. We can drop pieces
- // from our storage, and can't communicate this to peers
- // except by reconnecting.
+ // TODO: Tell the peer we don't have the piece, and reject this request.
requestsReceivedForMissingPieces.Add(1)
return fmt.Errorf("peer requested piece we don't have: %v", r.Index.Int())
}
}
value := &peerRequestState{}
c.peerRequests[r] = value
- go c.peerRequestDataReader(r, value)
+ if startFetch {
+ // TODO: Limit peer request data read concurrency.
+ go c.peerRequestDataReader(r, value)
+ }
return nil
}
}
torrent.Add("peer request data read successes", 1)
prs.data = b
+ // This might be required for the error case too (#752 and #753).
c.tickleWriter()
}
}
err = c.peerSentBitfield(msg.Bitfield)
case pp.Request:
r := newRequestFromMessage(&msg)
- err = c.onReadRequest(r)
+ err = c.onReadRequest(r, true)
case pp.Piece:
c.doChunkReadStats(int64(len(msg.Piece)))
err = c.receiveChunk(&msg)
chunksReceived.Add("total", 1)
ppReq := newRequestFromMessage(msg)
- req := c.t.requestIndexFromRequest(ppReq)
t := c.t
+ err := t.checkValidReceiveChunk(ppReq)
+ if err != nil {
+ err = log.WithLevel(log.Warning, err)
+ return err
+ }
+ req := c.t.requestIndexFromRequest(ppReq)
- if c.bannableAddr.Ok() {
- t.smartBanCache.RecordBlock(c.bannableAddr.Value(), req, msg.Piece)
+ if c.bannableAddr.Ok {
+ t.smartBanCache.RecordBlock(c.bannableAddr.Value, req, msg.Piece)
}
if c.peerChoking {
p.cancel(req)
}
- err := func() error {
+ err = func() error {
cl.unlock()
defer cl.lock()
concurrentChunkWrites.Add(1)
if c.t.requestingPeer(r) != c {
panic("only one peer should have a given request at a time")
}
- c.t.requestState[r] = requestState{}
+ delete(c.t.requestState, r)
// c.t.iterPeers(func(p *Peer) {
// if p.isLowOnRequests() {
// p.updateRequests("Peer.deleteRequest")
}
func (c *Peer) peerPriority() (peerPriority, error) {
- return bep40Priority(c.remoteIpPort(), c.t.cl.publicAddr(c.remoteIp()))
+ return bep40Priority(c.remoteIpPort(), c.localPublicAddr)
}
func (c *Peer) remoteIp() net.IP {
return &cn._stats
}
+func (cn *Peer) Stats() *ConnStats {
+ return cn.stats()
+}
+
func (p *Peer) TryAsPeerConn() (*PeerConn, bool) {
pc, ok := p.peerImpl.(*PeerConn)
return pc, ok