"net/netip"
"strconv"
"strings"
+ "sync/atomic"
"time"
"github.com/RoaringBitmap/roaring"
messageWriter peerConnMsgWriter
- uploadTimer *time.Timer
- pex pexConnState
+ PeerExtensionIDs map[pp.ExtensionName]pp.ExtensionNumber
+ PeerClientName atomic.Value
+ uploadTimer *time.Timer
+ pex pexConnState
// The pieces the peer has claimed to have.
_peerPieces roaring.Bitmap
}), ","),
cn.pex.numPending(),
)
-
}
}
cn.connString,
fmt.Sprintf("peer id: %+q", cn.PeerID),
fmt.Sprintf("extensions: %v", cn.PeerExtensionBytes),
+ fmt.Sprintf("ltep extensions: %v", cn.PeerExtensionIDs),
fmt.Sprintf("pex: %s", cn.pexStatus()),
}
}
}
func (c *PeerConn) peerRequestDataReader(r Request, prs *peerRequestState) {
- b, err := c.readPeerRequestData(r, prs)
+ // Should we depend on Torrent closure here? I think it's okay to get cancelled from elsewhere,
+ // or fail to read and then cleanup. Also, we used to hang here if the reservation was never
+ // dropped, that was fixed.
+ ctx := context.Background()
+ err := prs.allocReservation.Wait(ctx)
+ if err != nil {
+ c.logger.WithDefaultLevel(log.Debug).Levelf(log.ErrorLevel(err), "waiting for alloc limit reservation: %v", err)
+ return
+ }
+ b, err := c.readPeerRequestData(r)
c.locker().Lock()
defer c.locker().Unlock()
if err != nil {
// https://github.com/anacrolix/torrent/issues/702#issuecomment-1000953313.
logLevel = log.Debug
}
- c.logger.WithDefaultLevel(logLevel).Printf("error reading chunk for peer Request %v: %v", r, err)
+ c.logger.Levelf(logLevel, "error reading chunk for peer Request %v: %v", r, err)
if c.t.closed.IsSet() {
return
}
}
}
-func (c *PeerConn) readPeerRequestData(r Request, prs *peerRequestState) ([]byte, error) {
- // Should we depend on Torrent closure here? I think it's okay to get cancelled from elsewhere,
- // or fail to read and then cleanup.
- ctx := context.Background()
- err := prs.allocReservation.Wait(ctx)
- if err != nil {
- if ctx.Err() == nil {
- // The error is from the reservation itself. Something is very broken, or we're not
- // guarding against excessively large requests.
- err = log.WithLevel(log.Critical, err)
- }
- err = fmt.Errorf("waiting for alloc limit reservation: %w", err)
- return nil, err
- }
+func (c *PeerConn) readPeerRequestData(r Request) ([]byte, error) {
b := make([]byte, r.Length)
p := c.t.info.Piece(int(r.Index))
n, err := c.t.readAt(b, p.Offset()+int64(r.Begin))
case pp.Reject:
req := newRequestFromMessage(&msg)
if !c.remoteRejectedRequest(c.t.requestIndexFromRequest(req)) {
- c.logger.Printf("received invalid reject [request=%v, peer=%v]", req, c)
- err = fmt.Errorf("received invalid reject [request=%v]", req)
+ err = fmt.Errorf("received invalid reject for request %v", req)
+ c.logger.Levelf(log.Debug, "%v", err)
}
case pp.AllowedFast:
torrent.Add("allowed fasts received", 1)
return netip.AddrPortFrom(addrPort.Addr(), uint16(c.PeerListenPort))
}
-func (c *PeerConn) pexEvent(t pexEventType) pexEvent {
+func (c *PeerConn) pexEvent(t pexEventType) (_ pexEvent, err error) {
f := c.pexPeerFlags()
- addr := c.dialAddr()
- return pexEvent{t, addr, f, nil}
+ dialAddr := c.dialAddr()
+ addr, err := addrPortFromPeerRemoteAddr(dialAddr)
+ if err != nil || !addr.IsValid() {
+ err = fmt.Errorf("parsing dial addr %q: %w", dialAddr, err)
+ return
+ }
+ return pexEvent{t, addr, f, nil}, nil
}
func (c *PeerConn) String() string {
- return fmt.Sprintf("%T %p [id=%q, exts=%v, v=%q]", c, c, c.PeerID, c.PeerExtensionBytes, c.PeerClientName.Load())
+ return fmt.Sprintf("%T %p [id=%+q, exts=%v, v=%q]", c, c, c.PeerID, c.PeerExtensionBytes, c.PeerClientName.Load())
}
// Returns the pieces the peer could have based on their claims. If we don't know how many pieces
func (pc *PeerConn) bitExtensionEnabled(bit pp.ExtensionBit) bool {
return pc.t.cl.config.Extensions.GetBit(bit) && pc.PeerExtensionBytes.GetBit(bit)
}
+
+func (cn *PeerConn) peerPiecesChanged() {
+ cn.t.maybeDropMutuallyCompletePeer(cn)
+}
+
+// Returns whether the connection could be useful to us. We're seeding and
+// they want data, we don't have metainfo and they can provide it, etc.
+func (c *PeerConn) useful() bool {
+ t := c.t
+ if c.closed.IsSet() {
+ return false
+ }
+ if !t.haveInfo() {
+ return c.supportsExtension("ut_metadata")
+ }
+ if t.seeding() && c.peerInterested {
+ return true
+ }
+ if c.peerHasWantedPieces() {
+ return true
+ }
+ return false
+}