X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=peerconn.go;h=e2d944ff269bb94426772d60f267bb359582d156;hb=HEAD;hp=9915758332b7ca1408e58fd566008e3bba224e19;hpb=3a92268f10184f5ed4602fa43d476d0ada96dc8e;p=btrtrc.git diff --git a/peerconn.go b/peerconn.go index 99157583..e2d944ff 100644 --- a/peerconn.go +++ b/peerconn.go @@ -12,6 +12,7 @@ import ( "net/netip" "strconv" "strings" + "sync/atomic" "time" "github.com/RoaringBitmap/roaring" @@ -54,8 +55,10 @@ type PeerConn struct { messageWriter peerConnMsgWriter - uploadTimer *time.Timer - pex pexConnState + PeerExtensionIDs map[pp.ExtensionName]pp.ExtensionNumber + PeerClientName atomic.Value + uploadTimer *time.Timer + pex pexConnState // The pieces the peer has claimed to have. _peerPieces roaring.Bitmap @@ -95,7 +98,6 @@ func (cn *PeerConn) pexStatus() string { }), ","), cn.pex.numPending(), ) - } } @@ -104,6 +106,7 @@ func (cn *PeerConn) peerImplStatusLines() []string { cn.connString, fmt.Sprintf("peer id: %+q", cn.PeerID), fmt.Sprintf("extensions: %v", cn.PeerExtensionBytes), + fmt.Sprintf("ltep extensions: %v", cn.PeerExtensionIDs), fmt.Sprintf("pex: %s", cn.pexStatus()), } } @@ -586,7 +589,16 @@ func (c *PeerConn) onReadRequest(r Request, startFetch bool) error { } func (c *PeerConn) peerRequestDataReader(r Request, prs *peerRequestState) { - b, err := c.readPeerRequestData(r, prs) + // Should we depend on Torrent closure here? I think it's okay to get cancelled from elsewhere, + // or fail to read and then cleanup. Also, we used to hang here if the reservation was never + // dropped, that was fixed. + ctx := context.Background() + err := prs.allocReservation.Wait(ctx) + if err != nil { + c.logger.WithDefaultLevel(log.Debug).Levelf(log.ErrorLevel(err), "waiting for alloc limit reservation: %v", err) + return + } + b, err := c.readPeerRequestData(r) c.locker().Lock() defer c.locker().Unlock() if err != nil { @@ -612,7 +624,7 @@ func (c *PeerConn) peerRequestDataReadFailed(err error, r Request) { // https://github.com/anacrolix/torrent/issues/702#issuecomment-1000953313. logLevel = log.Debug } - c.logger.WithDefaultLevel(logLevel).Printf("error reading chunk for peer Request %v: %v", r, err) + c.logger.Levelf(logLevel, "error reading chunk for peer Request %v: %v", r, err) if c.t.closed.IsSet() { return } @@ -643,20 +655,7 @@ func (c *PeerConn) peerRequestDataReadFailed(err error, r Request) { } } -func (c *PeerConn) readPeerRequestData(r Request, prs *peerRequestState) ([]byte, error) { - // Should we depend on Torrent closure here? I think it's okay to get cancelled from elsewhere, - // or fail to read and then cleanup. - ctx := context.Background() - err := prs.allocReservation.Wait(ctx) - if err != nil { - if ctx.Err() == nil { - // The error is from the reservation itself. Something is very broken, or we're not - // guarding against excessively large requests. - err = log.WithLevel(log.Critical, err) - } - err = fmt.Errorf("waiting for alloc limit reservation: %w", err) - return nil, err - } +func (c *PeerConn) readPeerRequestData(r Request) ([]byte, error) { b := make([]byte, r.Length) p := c.t.info.Piece(int(r.Index)) n, err := c.t.readAt(b, p.Offset()+int64(r.Begin)) @@ -824,8 +823,8 @@ func (c *PeerConn) mainReadLoop() (err error) { case pp.Reject: req := newRequestFromMessage(&msg) if !c.remoteRejectedRequest(c.t.requestIndexFromRequest(req)) { - c.logger.Printf("received invalid reject [request=%v, peer=%v]", req, c) - err = fmt.Errorf("received invalid reject [request=%v]", req) + err = fmt.Errorf("received invalid reject for request %v", req) + c.logger.Levelf(log.Debug, "%v", err) } case pp.AllowedFast: torrent.Add("allowed fasts received", 1) @@ -1072,14 +1071,19 @@ func (c *PeerConn) dialAddr() PeerRemoteAddr { return netip.AddrPortFrom(addrPort.Addr(), uint16(c.PeerListenPort)) } -func (c *PeerConn) pexEvent(t pexEventType) pexEvent { +func (c *PeerConn) pexEvent(t pexEventType) (_ pexEvent, err error) { f := c.pexPeerFlags() - addr := c.dialAddr() - return pexEvent{t, addr, f, nil} + dialAddr := c.dialAddr() + addr, err := addrPortFromPeerRemoteAddr(dialAddr) + if err != nil || !addr.IsValid() { + err = fmt.Errorf("parsing dial addr %q: %w", dialAddr, err) + return + } + return pexEvent{t, addr, f, nil}, nil } func (c *PeerConn) String() string { - return fmt.Sprintf("%T %p [id=%q, exts=%v, v=%q]", c, c, c.PeerID, c.PeerExtensionBytes, c.PeerClientName.Load()) + return fmt.Sprintf("%T %p [id=%+q, exts=%v, v=%q]", c, c, c.PeerID, c.PeerExtensionBytes, c.PeerClientName.Load()) } // Returns the pieces the peer could have based on their claims. If we don't know how many pieces @@ -1102,3 +1106,26 @@ func (pc *PeerConn) remoteDialAddrPort() (netip.AddrPort, error) { func (pc *PeerConn) bitExtensionEnabled(bit pp.ExtensionBit) bool { return pc.t.cl.config.Extensions.GetBit(bit) && pc.PeerExtensionBytes.GetBit(bit) } + +func (cn *PeerConn) peerPiecesChanged() { + cn.t.maybeDropMutuallyCompletePeer(cn) +} + +// Returns whether the connection could be useful to us. We're seeding and +// they want data, we don't have metainfo and they can provide it, etc. +func (c *PeerConn) useful() bool { + t := c.t + if c.closed.IsSet() { + return false + } + if !t.haveInfo() { + return c.supportsExtension("ut_metadata") + } + if t.seeding() && c.peerInterested { + return true + } + if c.peerHasWantedPieces() { + return true + } + return false +}