]> Sergey Matveev's repositories - btrtrc.git/blobdiff - peerconn.go
Drop support for go 1.20
[btrtrc.git] / peerconn.go
index 0318cb6e33d7e7df3f2e63f03f715c937c437f8d..e2d944ff269bb94426772d60f267bb359582d156 100644 (file)
@@ -9,15 +9,19 @@ import (
        "io"
        "math/rand"
        "net"
+       "net/netip"
        "strconv"
        "strings"
+       "sync/atomic"
        "time"
 
        "github.com/RoaringBitmap/roaring"
+       "github.com/anacrolix/generics"
        . "github.com/anacrolix/generics"
        "github.com/anacrolix/log"
        "github.com/anacrolix/missinggo/v2/bitmap"
        "github.com/anacrolix/multiless"
+       "golang.org/x/exp/maps"
        "golang.org/x/time/rate"
 
        "github.com/anacrolix/torrent/bencode"
@@ -25,6 +29,7 @@ import (
        "github.com/anacrolix/torrent/metainfo"
        "github.com/anacrolix/torrent/mse"
        pp "github.com/anacrolix/torrent/peer_protocol"
+       utHolepunch "github.com/anacrolix/torrent/peer_protocol/ut-holepunch"
 )
 
 // Maintains the state of a BitTorrent-protocol based connection with a peer.
@@ -38,6 +43,7 @@ type PeerConn struct {
        // See BEP 3 etc.
        PeerID             PeerID
        PeerExtensionBytes pp.PeerExtensionBits
+       PeerListenPort     int
 
        // The actual Conn, used for closing, and setting socket options. Do not use methods on this
        // while holding any mutexes.
@@ -49,8 +55,10 @@ type PeerConn struct {
 
        messageWriter peerConnMsgWriter
 
-       uploadTimer *time.Timer
-       pex         pexConnState
+       PeerExtensionIDs map[pp.ExtensionName]pp.ExtensionNumber
+       PeerClientName   atomic.Value
+       uploadTimer      *time.Timer
+       pex              pexConnState
 
        // The pieces the peer has claimed to have.
        _peerPieces roaring.Bitmap
@@ -59,10 +67,48 @@ type PeerConn struct {
        peerSentHaveAll bool
 
        peerRequestDataAllocLimiter alloclim.Limiter
+
+       outstandingHolepunchingRendezvous map[netip.AddrPort]struct{}
+}
+
+func (cn *PeerConn) pexStatus() string {
+       if !cn.bitExtensionEnabled(pp.ExtensionBitLtep) {
+               return "extended protocol disabled"
+       }
+       if cn.PeerExtensionIDs == nil {
+               return "pending extended handshake"
+       }
+       if !cn.supportsExtension(pp.ExtensionNamePex) {
+               return "unsupported"
+       }
+       if true {
+               return fmt.Sprintf(
+                       "%v conns, %v unsent events",
+                       len(cn.pex.remoteLiveConns),
+                       cn.pex.numPending(),
+               )
+       } else {
+               // This alternative branch prints out the remote live conn addresses.
+               return fmt.Sprintf(
+                       "%v conns, %v unsent events",
+                       strings.Join(generics.SliceMap(
+                               maps.Keys(cn.pex.remoteLiveConns),
+                               func(from netip.AddrPort) string {
+                                       return from.String()
+                               }), ","),
+                       cn.pex.numPending(),
+               )
+       }
 }
 
 func (cn *PeerConn) peerImplStatusLines() []string {
-       return []string{fmt.Sprintf("%+-55q %s %s", cn.PeerID, cn.PeerExtensionBytes, cn.connString)}
+       return []string{
+               cn.connString,
+               fmt.Sprintf("peer id: %+q", cn.PeerID),
+               fmt.Sprintf("extensions: %v", cn.PeerExtensionBytes),
+               fmt.Sprintf("ltep extensions: %v", cn.PeerExtensionIDs),
+               fmt.Sprintf("pex: %s", cn.pexStatus()),
+       }
 }
 
 // Returns true if the connection is over IPv6.
@@ -74,10 +120,12 @@ func (cn *PeerConn) ipv6() bool {
        return len(ip) == net.IPv6len
 }
 
-// Returns true the if the dialer/initiator has the lower client peer ID. TODO: Find the
-// specification for this.
+// Returns true the if the dialer/initiator has the higher client peer ID. See
+// https://github.com/arvidn/libtorrent/blame/272828e1cc37b042dfbbafa539222d8533e99755/src/bt_peer_connection.cpp#L3536-L3557.
+// As far as I can tell, Transmission just keeps the oldest connection.
 func (cn *PeerConn) isPreferredDirection() bool {
-       return bytes.Compare(cn.t.cl.peerID[:], cn.PeerID[:]) < 0 == cn.outgoing
+       // True if our client peer ID is higher than the remote's peer ID.
+       return bytes.Compare(cn.PeerID[:], cn.t.cl.peerID[:]) < 0 == cn.outgoing
 }
 
 // Returns whether the left connection should be preferred over the right one,
@@ -541,7 +589,16 @@ func (c *PeerConn) onReadRequest(r Request, startFetch bool) error {
 }
 
 func (c *PeerConn) peerRequestDataReader(r Request, prs *peerRequestState) {
-       b, err := c.readPeerRequestData(r, prs)
+       // Should we depend on Torrent closure here? I think it's okay to get cancelled from elsewhere,
+       // or fail to read and then cleanup. Also, we used to hang here if the reservation was never
+       // dropped, that was fixed.
+       ctx := context.Background()
+       err := prs.allocReservation.Wait(ctx)
+       if err != nil {
+               c.logger.WithDefaultLevel(log.Debug).Levelf(log.ErrorLevel(err), "waiting for alloc limit reservation: %v", err)
+               return
+       }
+       b, err := c.readPeerRequestData(r)
        c.locker().Lock()
        defer c.locker().Unlock()
        if err != nil {
@@ -567,7 +624,7 @@ func (c *PeerConn) peerRequestDataReadFailed(err error, r Request) {
                // https://github.com/anacrolix/torrent/issues/702#issuecomment-1000953313.
                logLevel = log.Debug
        }
-       c.logger.WithDefaultLevel(logLevel).Printf("error reading chunk for peer Request %v: %v", r, err)
+       c.logger.Levelf(logLevel, "error reading chunk for peer Request %v: %v", r, err)
        if c.t.closed.IsSet() {
                return
        }
@@ -598,20 +655,7 @@ func (c *PeerConn) peerRequestDataReadFailed(err error, r Request) {
        }
 }
 
-func (c *PeerConn) readPeerRequestData(r Request, prs *peerRequestState) ([]byte, error) {
-       // Should we depend on Torrent closure here? I think it's okay to get cancelled from elsewhere,
-       // or fail to read and then cleanup.
-       ctx := context.Background()
-       err := prs.allocReservation.Wait(ctx)
-       if err != nil {
-               if ctx.Err() == nil {
-                       // The error is from the reservation itself. Something is very broken, or we're not
-                       // guarding against excessively large requests.
-                       err = log.WithLevel(log.Critical, err)
-               }
-               err = fmt.Errorf("waiting for alloc limit reservation: %w", err)
-               return nil, err
-       }
+func (c *PeerConn) readPeerRequestData(r Request) ([]byte, error) {
        b := make([]byte, r.Length)
        p := c.t.info.Piece(int(r.Index))
        n, err := c.t.readAt(b, p.Offset()+int64(r.Begin))
@@ -779,8 +823,8 @@ func (c *PeerConn) mainReadLoop() (err error) {
                case pp.Reject:
                        req := newRequestFromMessage(&msg)
                        if !c.remoteRejectedRequest(c.t.requestIndexFromRequest(req)) {
-                               c.logger.Printf("received invalid reject [request=%v, peer=%v]", req, c)
-                               err = fmt.Errorf("received invalid reject [request=%v]", req)
+                               err = fmt.Errorf("received invalid reject for request %v", req)
+                               c.logger.Levelf(log.Debug, "%v", err)
                        }
                case pp.AllowedFast:
                        torrent.Add("allowed fasts received", 1)
@@ -848,6 +892,7 @@ func (c *PeerConn) onReadExtendedMsg(id pp.ExtensionNumber, payload []byte) (err
                c.requestPendingMetadata()
                if !t.cl.config.DisablePEX {
                        t.pex.Add(c) // we learnt enough now
+                       // This checks the extension is supported internally.
                        c.pex.Init(c)
                }
                return nil
@@ -861,7 +906,20 @@ func (c *PeerConn) onReadExtendedMsg(id pp.ExtensionNumber, payload []byte) (err
                if !c.pex.IsEnabled() {
                        return nil // or hang-up maybe?
                }
-               return c.pex.Recv(payload)
+               err = c.pex.Recv(payload)
+               if err != nil {
+                       err = fmt.Errorf("receiving pex message: %w", err)
+               }
+               return
+       case utHolepunchExtendedId:
+               var msg utHolepunch.Msg
+               err = msg.UnmarshalBinary(payload)
+               if err != nil {
+                       err = fmt.Errorf("unmarshalling ut_holepunch message: %w", err)
+                       return
+               }
+               err = c.t.handleReceivedUtHolepunchMsg(msg, c)
+               return
        default:
                return fmt.Errorf("unexpected extended message ID: %v", id)
        }
@@ -997,29 +1055,35 @@ func (c *PeerConn) pexPeerFlags() pp.PexPeerFlags {
 // This returns the address to use if we want to dial the peer again. It incorporates the peer's
 // advertised listen port.
 func (c *PeerConn) dialAddr() PeerRemoteAddr {
-       if !c.outgoing && c.PeerListenPort != 0 {
-               switch addr := c.RemoteAddr.(type) {
-               case *net.TCPAddr:
-                       dialAddr := *addr
-                       dialAddr.Port = c.PeerListenPort
-                       return &dialAddr
-               case *net.UDPAddr:
-                       dialAddr := *addr
-                       dialAddr.Port = c.PeerListenPort
-                       return &dialAddr
-               }
+       if c.outgoing || c.PeerListenPort == 0 {
+               return c.RemoteAddr
        }
-       return c.RemoteAddr
+       addrPort, err := addrPortFromPeerRemoteAddr(c.RemoteAddr)
+       if err != nil {
+               c.logger.Levelf(
+                       log.Warning,
+                       "error parsing %q for alternate dial port: %v",
+                       c.RemoteAddr,
+                       err,
+               )
+               return c.RemoteAddr
+       }
+       return netip.AddrPortFrom(addrPort.Addr(), uint16(c.PeerListenPort))
 }
 
-func (c *PeerConn) pexEvent(t pexEventType) pexEvent {
+func (c *PeerConn) pexEvent(t pexEventType) (_ pexEvent, err error) {
        f := c.pexPeerFlags()
-       addr := c.dialAddr()
-       return pexEvent{t, addr, f, nil}
+       dialAddr := c.dialAddr()
+       addr, err := addrPortFromPeerRemoteAddr(dialAddr)
+       if err != nil || !addr.IsValid() {
+               err = fmt.Errorf("parsing dial addr %q: %w", dialAddr, err)
+               return
+       }
+       return pexEvent{t, addr, f, nil}, nil
 }
 
 func (c *PeerConn) String() string {
-       return fmt.Sprintf("%T %p [id=%q, exts=%v, v=%q]", c, c, c.PeerID, c.PeerExtensionBytes, c.PeerClientName.Load())
+       return fmt.Sprintf("%T %p [id=%+q, exts=%v, v=%q]", c, c, c.PeerID, c.PeerExtensionBytes, c.PeerClientName.Load())
 }
 
 // Returns the pieces the peer could have based on their claims. If we don't know how many pieces
@@ -1033,3 +1097,35 @@ func (cn *PeerConn) PeerPieces() *roaring.Bitmap {
 func (pc *PeerConn) remoteIsTransmission() bool {
        return bytes.HasPrefix(pc.PeerID[:], []byte("-TR")) && pc.PeerID[7] == '-'
 }
+
+func (pc *PeerConn) remoteDialAddrPort() (netip.AddrPort, error) {
+       dialAddr := pc.dialAddr()
+       return addrPortFromPeerRemoteAddr(dialAddr)
+}
+
+func (pc *PeerConn) bitExtensionEnabled(bit pp.ExtensionBit) bool {
+       return pc.t.cl.config.Extensions.GetBit(bit) && pc.PeerExtensionBytes.GetBit(bit)
+}
+
+func (cn *PeerConn) peerPiecesChanged() {
+       cn.t.maybeDropMutuallyCompletePeer(cn)
+}
+
+// Returns whether the connection could be useful to us. We're seeding and
+// they want data, we don't have metainfo and they can provide it, etc.
+func (c *PeerConn) useful() bool {
+       t := c.t
+       if c.closed.IsSet() {
+               return false
+       }
+       if !t.haveInfo() {
+               return c.supportsExtension("ut_metadata")
+       }
+       if t.seeding() && c.peerInterested {
+               return true
+       }
+       if c.peerHasWantedPieces() {
+               return true
+       }
+       return false
+}